This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bca1e82b49 add unit test for QueryServer (#12599)
bca1e82b49 is described below
commit bca1e82b49138f6c04a7c4870fdadb66d269bbe7
Author: sullis <[email protected]>
AuthorDate: Fri Mar 8 10:22:24 2024 -0800
add unit test for QueryServer (#12599)
---
.../apache/pinot/core/transport/QueryServer.java | 28 ++++----
.../pinot/core/transport/QueryServerTest.java | 79 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 15 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 915b2eb1f2..0e1c7d1aa0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -18,10 +18,10 @@
*/
package org.apache.pinot.core.transport;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -59,7 +59,7 @@ public class QueryServer {
private final EventLoopGroup _workerGroup;
private final Class<? extends ServerSocketChannel> _channelClass;
private final ChannelHandler _instanceRequestHandler;
- private Channel _channel;
+ private ServerSocketChannel _channel;
/**
* Create an unsecured server instance
@@ -85,16 +85,12 @@ public class QueryServer {
boolean enableNativeTransports = nettyConfig != null &&
nettyConfig.isNativeTransportsEnabled();
OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
- if (enableNativeTransports
- && operatingSystemType == OsCheck.OSType.Linux
- && Epoll.isAvailable()) {
+ if (enableNativeTransports && operatingSystemType == OsCheck.OSType.Linux
&& Epoll.isAvailable()) {
_bossGroup = new EpollEventLoopGroup();
_workerGroup = new EpollEventLoopGroup();
_channelClass = EpollServerSocketChannel.class;
LOGGER.info("Using Epoll event loop");
- } else if (enableNativeTransports
- && operatingSystemType == OsCheck.OSType.MacOS
- && KQueue.isAvailable()) {
+ } else if (enableNativeTransports && operatingSystemType ==
OsCheck.OSType.MacOS && KQueue.isAvailable()) {
_bossGroup = new KQueueEventLoopGroup();
_workerGroup = new KQueueEventLoopGroup();
_channelClass = KQueueServerSocketChannel.class;
@@ -104,11 +100,9 @@ public class QueryServer {
_workerGroup = new NioEventLoopGroup();
_channelClass = NioServerSocketChannel.class;
StringBuilder log = new StringBuilder("Using NIO event loop");
- if (operatingSystemType == OsCheck.OSType.Linux
- && enableNativeTransports) {
+ if (operatingSystemType == OsCheck.OSType.Linux &&
enableNativeTransports) {
log.append(", as Epoll is not available:
").append(Epoll.unavailabilityCause());
- } else if (operatingSystemType == OsCheck.OSType.MacOS
- && enableNativeTransports) {
+ } else if (operatingSystemType == OsCheck.OSType.MacOS &&
enableNativeTransports) {
log.append(", as KQueue is not available:
").append(KQueue.unavailabilityCause());
}
LOGGER.info(log.toString());
@@ -130,10 +124,9 @@ public class QueryServer {
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
metric::normalCacheSize);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_THREADLOCALCACHE,
metric::numThreadLocalCaches);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
- _channel = serverBootstrap.group(_bossGroup,
_workerGroup).channel(_channelClass)
+ _channel = (ServerSocketChannel) serverBootstrap.group(_bossGroup,
_workerGroup).channel(_channelClass)
.option(ChannelOption.SO_BACKLOG,
128).childOption(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.ALLOCATOR, bufAllocator)
- .childHandler(new ChannelInitializer<SocketChannel>() {
+ .option(ChannelOption.ALLOCATOR, bufAllocator).childHandler(new
ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
if (_tlsConfig != null) {
@@ -165,4 +158,9 @@ public class QueryServer {
_bossGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
+
+ @VisibleForTesting
+ ServerSocketChannel getChannel() {
+ return _channel;
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
new file mode 100644
index 0000000000..bef2609067
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import io.netty.channel.ChannelHandler;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.common.config.NettyConfig;
+import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class QueryServerTest {
+ @DataProvider
+ public Object[][] parameters() {
+ return new Object[][]{
+ new Object[]{true}, new Object[]{false},
+ };
+ }
+
+ @Test(dataProvider = "parameters")
+ public void startAndStop(final boolean nativeTransportEnabled) {
+ PinotMetricUtils.init(new PinotConfiguration());
+ PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+ ServerMetrics.register(new ServerMetrics(registry));
+ NettyConfig nettyConfig = new NettyConfig();
+ nettyConfig.setNativeTransportsEnabled(nativeTransportEnabled);
+ TlsConfig tlsConfig = new TlsConfig();
+ ChannelHandler channelHandler = mock(ChannelHandler.class);
+
+ QueryServer server = new QueryServer(0, nettyConfig, tlsConfig,
channelHandler);
+ server.start();
+
+ final InetSocketAddress serverAddress = server.getChannel().localAddress();
+
+ assertTrue(connectionOk(serverAddress));
+
+ server.shutDown();
+ assertFalse(connectionOk(serverAddress));
+ }
+
+ private static boolean connectionOk(InetSocketAddress address) {
+ Socket s = null;
+ try {
+ s = new Socket(address.getHostName(), address.getPort());
+ return s.isConnected();
+ } catch (Exception e) {
+ return false;
+ } finally {
+ IOUtils.closeQuietly(s);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]