This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 46f3f7e4 [#133] feat(netty): Add StreamServer. (#718)
46f3f7e4 is described below
commit 46f3f7e43ca9d432495823380bb5ceff4edcc427
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Mar 15 20:17:09 2023 +0800
[#133] feat(netty): Add StreamServer. (#718)
### What changes were proposed in this pull request?
Add StreamServer for netty replace grpc.
### Why are the changes needed?
Add StreamServer.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Co-authored-by: leixianming <[email protected]>
---
server/pom.xml | 5 +
.../org/apache/uniffle/server/ShuffleServer.java | 13 ++
.../apache/uniffle/server/ShuffleServerConf.java | 55 ++++++++
.../apache/uniffle/server/netty/StreamServer.java | 142 +++++++++++++++++++++
.../netty/decoder/StreamServerInitDecoder.java | 48 +++++++
.../apache/uniffle/server/ShuffleServerTest.java | 22 ++++
6 files changed, 285 insertions(+)
diff --git a/server/pom.xml b/server/pom.xml
index 582aee83..97907a1e 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -107,6 +107,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index a117dba0..3a47a81d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -51,6 +51,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CommonMetricsServlet;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.netty.StreamServer;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.util.StorageType;
@@ -92,6 +93,8 @@ public class ShuffleServer {
private volatile boolean running;
private ExecutorService executorService;
private Future<?> decommissionFuture;
+ private boolean nettyServerEnabled;
+ private StreamServer streamServer;
public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
@@ -124,6 +127,9 @@ public class ShuffleServer {
registerHeartBeat.startHeartBeat();
jettyServer.start();
server.start();
+ if (nettyServerEnabled) {
+ streamServer.start();
+ }
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
@@ -164,6 +170,9 @@ public class ShuffleServer {
}
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
+ if (nettyServerEnabled && streamServer != null) {
+ streamServer.stop();
+ }
if (executorService != null) {
executorService.shutdownNow();
}
@@ -221,6 +230,10 @@ public class ShuffleServer {
shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf,
shuffleFlushManager);
shuffleTaskManager = new ShuffleTaskManager(shuffleServerConf,
shuffleFlushManager,
shuffleBufferManager, storageManager);
+ nettyServerEnabled =
shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
+ if (nettyServerEnabled) {
+ streamServer = new StreamServer(this);
+ }
setServer();
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 10b0d2ad..f4d26749 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -404,6 +404,61 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(-1)
.withDescription("Shuffle netty server port");
+ public static final ConfigOption<Boolean> NETTY_SERVER_EPOLL_ENABLE =
ConfigOptions
+ .key("rss.server.netty.epoll.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("If enable epoll model with netty server");
+
+ public static final ConfigOption<Integer> NETTY_SERVER_ACCEPT_THREAD =
ConfigOptions
+ .key("rss.server.netty.accept.thread")
+ .intType()
+ .defaultValue(10)
+ .withDescription("Accept thread count in netty");
+
+ public static final ConfigOption<Integer> NETTY_SERVER_WORKER_THREAD =
ConfigOptions
+ .key("rss.server.netty.worker.thread")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Worker thread count in netty");
+
+ public static final ConfigOption<Long> SERVER_NETTY_HANDLER_IDLE_TIMEOUT =
ConfigOptions
+ .key("rss.server.netty.handler.idle.timeout")
+ .longType()
+ .defaultValue(60000L)
+ .withDescription("Idle timeout if there has not data");
+
+ public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_BACKLOG =
ConfigOptions
+ .key("rss.server.netty.connect.backlog")
+ .intType()
+ .defaultValue(0)
+ .withDescription("For netty server, requested maximum length of the
queue of incoming connections. "
+ + "Default 0 for no backlog.");
+
+ public static final ConfigOption<Integer> NETTY_SERVER_CONNECT_TIMEOUT =
ConfigOptions
+ .key("rss.server.netty.connect.timeout")
+ .intType()
+ .defaultValue(5000)
+ .withDescription("Timeout for connection in netty");
+
+ public static final ConfigOption<Integer> NETTY_SERVER_SEND_BUF =
ConfigOptions
+ .key("rss.server.netty.send.buf")
+ .intType()
+ .defaultValue(0)
+ .withDescription("the optimal size for send buffer(SO_SNDBUF) "
+ + "should be latency * network_bandwidth. Assuming
latency = 1ms,"
+ + "network_bandwidth = 10Gbps, buffer size should
be ~ 1.25MB."
+ + "Default is 0, OS will dynamically adjust the buf
size.");
+
+ public static final ConfigOption<Integer> NETTY_SERVER_RECEIVE_BUF =
ConfigOptions
+ .key("rss.server.netty.receive.buf")
+ .intType()
+ .defaultValue(0)
+ .withDescription("the optimal size for receive buffer(SO_RCVBUF) "
+ + "should be latency * network_bandwidth. Assuming
latency = 1ms,"
+ + "network_bandwidth = 10Gbps, buffer size should
be ~ 1.25MB."
+ + "Default is 0, OS will dynamically adjust the buf
size.");
+
public ShuffleServerConf() {
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
new file mode 100644
index 00000000..8423515b
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.netty.decoder.StreamServerInitDecoder;
+
+public class StreamServer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamServer.class);
+
+ private ShuffleServer shuffleServer;
+ private EventLoopGroup shuffleBossGroup;
+ private EventLoopGroup shuffleWorkerGroup;
+ private ShuffleServerConf shuffleServerConf;
+ private ChannelFuture channelFuture;
+
+ public StreamServer(ShuffleServer shuffleServer) {
+ this.shuffleServer = shuffleServer;
+ this.shuffleServerConf = shuffleServer.getShuffleServerConf();
+ boolean isEpollEnable =
shuffleServerConf.getBoolean(ShuffleServerConf.NETTY_SERVER_EPOLL_ENABLE);
+ int acceptThreads =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_ACCEPT_THREAD);
+ int workerThreads =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_WORKER_THREAD);
+ if (isEpollEnable) {
+ shuffleBossGroup = new EpollEventLoopGroup(acceptThreads);
+ shuffleWorkerGroup = new EpollEventLoopGroup(workerThreads);
+ } else {
+ shuffleBossGroup = new NioEventLoopGroup(acceptThreads);
+ shuffleWorkerGroup = new NioEventLoopGroup(workerThreads);
+ }
+ }
+
+ private ServerBootstrap bootstrapChannel(
+ EventLoopGroup bossGroup,
+ EventLoopGroup workerGroup,
+ int backlogSize,
+ int timeoutMillis,
+ int sendBuf,
+ int receiveBuf,
+ Supplier<ChannelHandler[]> handlerSupplier) {
+ ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,
workerGroup);
+ if (bossGroup instanceof EpollEventLoopGroup) {
+ serverBootstrap.channel(EpollServerSocketChannel.class);
+ } else {
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ }
+
+ serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(final SocketChannel ch) {
+ ch.pipeline().addLast(handlerSupplier.get());
+ }
+ })
+ .option(ChannelOption.SO_BACKLOG, backlogSize)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS,
timeoutMillis)
+ .childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ if (sendBuf > 0) {
+ serverBootstrap.childOption(ChannelOption.SO_SNDBUF, sendBuf);
+ }
+ if (receiveBuf > 0) {
+ serverBootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBuf);
+ }
+ return serverBootstrap;
+ }
+
+ public void start() {
+ Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
+ new StreamServerInitDecoder()
+ };
+ ServerBootstrap serverBootstrap = bootstrapChannel(shuffleBossGroup,
shuffleWorkerGroup,
+
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_CONNECT_BACKLOG),
+
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_CONNECT_TIMEOUT),
+ shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_SEND_BUF),
+
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_RECEIVE_BUF),
+ streamHandlers);
+
+ // Bind the ports and save the results so that the channels can be closed
later.
+ // If the second bind fails, the first one gets cleaned up in the shutdown.
+ int port =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
+ try {
+ channelFuture = serverBootstrap.bind(port);
+ channelFuture.syncUninterruptibly();
+ LOG.info("bind localAddress is " +
channelFuture.channel().localAddress());
+ LOG.info("Start stream server successfully with port " + port);
+ } catch (Exception e) {
+ ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
+ }
+ }
+
+ public void stop() {
+ if (channelFuture != null) {
+ channelFuture.channel().close().awaitUninterruptibly(10L,
TimeUnit.SECONDS);
+ channelFuture = null;
+ }
+ if (shuffleBossGroup != null) {
+ shuffleBossGroup.shutdownGracefully();
+ shuffleWorkerGroup.shutdownGracefully();
+ shuffleBossGroup = null;
+ shuffleWorkerGroup = null;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/decoder/StreamServerInitDecoder.java
b/server/src/main/java/org/apache/uniffle/server/netty/decoder/StreamServerInitDecoder.java
new file mode 100644
index 00000000..d2551e55
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/netty/decoder/StreamServerInitDecoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.netty.decoder;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+public class StreamServerInitDecoder extends ByteToMessageDecoder {
+
+ public StreamServerInitDecoder() {
+ }
+
+ private void addDecoder(ChannelHandlerContext ctx, byte type) {
+
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx,
+ ByteBuf in,
+ List<Object> out) {
+ if (in.readableBytes() < Byte.BYTES) {
+ return;
+ }
+ in.markReaderIndex();
+ byte magicByte = in.readByte();
+ in.resetReaderIndex();
+
+ addDecoder(ctx, magicByte);
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 6dc5e56e..a2cc0895 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -134,4 +134,26 @@ public class ShuffleServerTest {
serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
return serverConf;
}
+
+ @Test
+ public void nettyServerTest() throws Exception {
+ ShuffleServerConf serverConf = createShuffleServerConf();
+ serverConf.set(ShuffleServerConf.NETTY_SERVER_PORT, 29999);
+ ShuffleServer ss1 = new ShuffleServer(serverConf);
+ ss1.start();
+ ExitUtils.disableSystemExit();
+ serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19997);
+ serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19996);
+ ShuffleServer ss2 = new ShuffleServer(serverConf);
+ String expectMessage = "Fail to start stream server";
+ final int expectStatus = 1;
+ try {
+ ss2.start();
+ } catch (Exception e) {
+ assertEquals(expectMessage, e.getMessage());
+ assertEquals(expectStatus, ((ExitException) e).getStatus());
+ return;
+ }
+ fail();
+ }
}