This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7476cea0 [#720] feat(netty): support random port for netty (#723)
7476cea0 is described below
commit 7476cea0095803d655cc5346a65feb765ec68885
Author: xumanbu <[email protected]>
AuthorDate: Mon Apr 3 23:16:22 2023 +0800
[#720] feat(netty): support random port for netty (#723)
### What changes were proposed in this pull request?
Support random Netty server port
1、add RssUitls.startServiceOnPort to support start service with random port
2、StreamServer implement ServerInterface to support random port
### Why are the changes needed?
Fix #720
### Does this PR introduce _any_ user-facing change?
StreamServer support random port,if port==0 start server with random port
### How was this patch tested?
UT
Co-authored-by: jam.xu <[email protected]>
---
.../apache/uniffle/common/config/ConfigUtils.java | 2 +
.../apache/uniffle/common/config/RssBaseConf.java | 18 ++++
.../org/apache/uniffle/common/rpc/GrpcServer.java | 9 +-
.../apache/uniffle/common/rpc/ServerInterface.java | 4 +-
.../org/apache/uniffle/common/util/Constants.java | 1 +
.../org/apache/uniffle/common/util/RssUtils.java | 60 +++++++++++
.../apache/uniffle/common/util/RssUtilsTest.java | 111 +++++++++++++++++++++
docs/server_guide.md | 1 +
.../test/ShuffleServerEnableStreamServerTest.java | 71 +++++++++++++
.../org/apache/uniffle/server/ShuffleServer.java | 5 +-
.../apache/uniffle/server/ShuffleServerConf.java | 3 +-
.../apache/uniffle/server/netty/StreamServer.java | 29 +++++-
.../apache/uniffle/server/ShuffleServerTest.java | 1 +
13 files changed, 306 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index bc0672eb..1a78abcf 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -207,6 +207,8 @@ public class ConfigUtils {
public static final Function<Long, Boolean> NON_NEGATIVE_LONG_VALIDATOR =
value -> value >= 0;
+ public static final Function<Integer, Boolean> SERVER_PORT_VALIDATOR = value
-> ((value == 0)
+
|| (value >= 1024 && value <= 65535));
public static final Function<Long, Boolean> POSITIVE_INTEGER_VALIDATOR =
value -> value > 0L && value <= Integer.MAX_VALUE;
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 761e5bf3..09322a62 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -212,6 +212,24 @@ public class RssBaseConf extends RssConf {
.defaultValue(5L)
.withDescription("Reconfigure check interval.");
+ public static final ConfigOption<Integer> RSS_RANDOM_PORT_MIN = ConfigOptions
+ .key("rss.random.port.min")
+ .intType()
+ .defaultValue(40000)
+ .withDescription("Min value for random for range");
+
+ public static final ConfigOption<Integer> RSS_RANDOM_PORT_MAX = ConfigOptions
+ .key("rss.random.port.max")
+ .intType()
+ .defaultValue(65535)
+ .withDescription("Max value for random for range");
+
+ public static final ConfigOption<Integer> SERVER_PORT_MAX_RETRIES =
ConfigOptions
+ .key("rss.port.max.retry")
+ .intType()
+ .defaultValue(16)
+ .withDescription("start server service max retry");
+
public boolean loadCommonConf(Map<String, String> properties) {
if (properties == null) {
return false;
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 59ade455..d96ee40b 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -155,7 +155,7 @@ public class GrpcServer implements ServerInterface {
}
}
- public void start() throws IOException {
+ public int start() throws IOException {
try {
server.start();
listenPort = server.getPort();
@@ -163,6 +163,13 @@ public class GrpcServer implements ServerInterface {
ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
}
LOG.info("Grpc server started, configured port: {}, listening on {}.",
port, listenPort);
+ return port;
+ }
+
+ @Override
+ public void startOnPort(int port) {
+ ExitUtils.terminate(1, "Fail to start grpc server",
+ new RuntimeException("GRpcServer not implement now"), LOG);
}
public void stop() throws InterruptedException {
diff --git
a/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
b/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
index 67cb8d5c..ab6a91b5 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/ServerInterface.java
@@ -21,7 +21,9 @@ import java.io.IOException;
public interface ServerInterface {
- void start() throws IOException;
+ int start() throws IOException;
+
+ void startOnPort(int port) throws Exception;
void stop() throws InterruptedException;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index b62d5b03..5e946866 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -71,4 +71,5 @@ public final class Constants {
public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on
device";
+ public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server";
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index dfae9e23..888e96fa 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
+import java.net.BindException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InterfaceAddress;
@@ -39,11 +40,14 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
+import io.netty.channel.unix.Errors;
+import org.eclipse.jetty.util.MultiException;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +56,8 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.rpc.ServerInterface;
+
public class RssUtils {
@@ -156,6 +162,60 @@ public class RssUtils {
return siteLocalAddress;
}
+ public static int startServiceOnPort(ServerInterface service, String
serviceName, int servicePort, RssBaseConf conf) {
+ if (servicePort < 0 || servicePort > 65535) {
+ throw new IllegalArgumentException(String.format("Bad service %s on port
(%s)", serviceName, servicePort));
+ }
+ int actualPort = servicePort;
+ int maxRetries = conf.get(RssBaseConf.SERVER_PORT_MAX_RETRIES);
+ for (int i = 0; i < maxRetries; i++) {
+ try {
+ if (servicePort == 0) {
+ actualPort = findRandomTcpPort(conf);
+ } else {
+ actualPort += i;
+ }
+ service.startOnPort(actualPort);
+ return actualPort;
+ } catch (Exception e) {
+ if (isServerPortBindCollision(e)) {
+ LOGGER.warn(String.format("%s:Service %s failed after %s retries (on
a random free port (%s))!",
+ e.getMessage(), serviceName, i + 1, actualPort));
+ } else {
+ throw new RssException(String.format("Failed to start service %s on
port %s", serviceName, servicePort), e);
+ }
+ }
+ }
+ throw new RssException(String.format("Failed to start service %s on port
%s", serviceName, servicePort));
+ }
+
+ /**
+ * check whether the exception is caused by an address-port collision when
binding.
+ */
+ public static boolean isServerPortBindCollision(Throwable e) {
+ if (e instanceof BindException) {
+ if (e.getMessage() != null) {
+ return true;
+ }
+ return isServerPortBindCollision(e.getCause());
+ } else if (e instanceof MultiException) {
+ return !((MultiException) e).getThrowables().stream()
+ .noneMatch((Throwable throwable) ->
isServerPortBindCollision(throwable));
+ } else if (e instanceof Errors.NativeIoException) {
+ return (e.getMessage() != null && e.getMessage().startsWith("bind()
failed: "))
+ || isServerPortBindCollision(e.getCause());
+ } else {
+ return false;
+ }
+ }
+
+ public static int findRandomTcpPort(RssBaseConf baseConf) {
+ int portRangeMin = baseConf.getInteger(RssBaseConf.RSS_RANDOM_PORT_MIN);
+ int portRangeMax = baseConf.getInteger(RssBaseConf.RSS_RANDOM_PORT_MAX);
+ int portRange = portRangeMax - portRangeMin;
+ return portRangeMin + ThreadLocalRandom.current().nextInt(portRange + 1);
+ }
+
public static byte[] serializeBitMap(Roaring64NavigableMap bitmap) throws
IOException {
long size = bitmap.serializedSizeInBytes();
if (size > Integer.MAX_VALUE) {
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 759dd09c..a98bbe19 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -17,9 +17,12 @@
package org.apache.uniffle.common.util;
+import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -27,6 +30,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import javax.net.ServerSocketFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -37,6 +41,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.rpc.ServerInterface;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -87,6 +92,70 @@ public class RssUtilsTest {
}
}
+ @Test
+ public void testStartServiceOnPort() throws InterruptedException {
+ RssBaseConf rssBaseConf = new RssBaseConf();
+ rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 100);
+ rssBaseConf.set(RssBaseConf.RSS_RANDOM_PORT_MIN, 30000);
+ rssBaseConf.set(RssBaseConf.RSS_RANDOM_PORT_MAX, 39999);
+ // zero port to get random port
+ MockServer mockServer = new MockServer();
+ int port = 0;
+ try {
+ int actualPort = RssUtils.startServiceOnPort(mockServer, "MockServer",
port, rssBaseConf);
+ assertTrue(actualPort >= 30000 && actualPort < 39999 +
rssBaseConf.get(RssBaseConf.SERVER_PORT_MAX_RETRIES));
+ } finally {
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ }
+ // error port test
+ try {
+ port = -1;
+ RssUtils.startServiceOnPort(mockServer, "MockServer", port, rssBaseConf);
+ } catch (RuntimeException e) {
+ assertTrue(e.toString().startsWith("java.lang.IllegalArgumentException:
Bad service"));
+ }
+ // a specific port to start
+ try {
+ mockServer = new MockServer();
+ port = 10000;
+ rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 100);
+ int actualPort = RssUtils.startServiceOnPort(mockServer, "MockServer",
port, rssBaseConf);
+ assertTrue(actualPort >= port && actualPort < port +
rssBaseConf.get(RssBaseConf.SERVER_PORT_MAX_RETRIES));
+ } finally {
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ }
+
+ // bind exception
+ MockServer toStartSockServer = new MockServer();
+ try {
+ mockServer = new MockServer();
+ port = 10000;
+ int actualPort1 = RssUtils.startServiceOnPort(mockServer, "MockServer",
port, rssBaseConf);
+ rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 10);
+ int actualPort2 = RssUtils.startServiceOnPort(toStartSockServer,
"MockServer", actualPort1, rssBaseConf);
+ assertTrue(actualPort1 < actualPort2);
+ toStartSockServer.stop();
+ rssBaseConf.set(RssBaseConf.SERVER_PORT_MAX_RETRIES, 0);
+ RssUtils.startServiceOnPort(toStartSockServer, "MockServer",
actualPort1, rssBaseConf);
+ assertFalse(false);
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().startsWith("Failed to start service"));
+ } finally {
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ if (toStartSockServer != null) {
+ toStartSockServer.stop();
+ }
+ }
+
+ }
+
+
@Test
public void testSerializeBitmap() throws Exception {
Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(1, 2, 100,
10000);
@@ -235,5 +304,47 @@ public class RssUtilsTest {
}
}
+ public static class MockServer implements ServerInterface {
+
+ ServerSocket serverSocket;
+
+ @Override
+ public int start() throws IOException {
+ // not implement
+ return -1;
+ }
+
+ @Override
+ public void startOnPort(int port) throws IOException {
+ serverSocket = ServerSocketFactory.getDefault().createServerSocket(
+ port, 1, InetAddress.getByName("localhost"));
+ new Thread(() -> {
+ Socket accept;
+ try {
+ accept = serverSocket.accept();
+ accept.close();
+ } catch (IOException e) {
+ //e.printStackTrace();
+ }
+ }).start();
+ }
+
+ @Override
+ public void stop() throws InterruptedException {
+ if (serverSocket != null && !serverSocket.isClosed()) {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ //e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void blockUntilShutdown() throws InterruptedException {
+ // not implement
+ }
+ }
+
}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index bfae4405..4b3dbedd 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -69,6 +69,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.coordinator.quorum | - |
Coordinator quorum
|
| rss.rpc.server.port | - | RPC port
for Shuffle server
|
| rss.jetty.http.port | - | Http port
for Shuffle server
|
+| rss.server.netty.port | -1 | Netty port
for Shuffle server, if set zero, netty server start on random port.
|
| rss.server.buffer.capacity | -1 | Max memory
of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio
is used
|
| rss.server.buffer.capacity.ratio | 0.8 | when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size *
ratio
|
| rss.server.memory.shuffle.highWaterMark.percentage | 75.0 | Threshold
of spill data to storage, percentage of rss.server.buffer.capacity
|
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
new file mode 100644
index 00000000..f98af1a4
--- /dev/null
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase {
+ @BeforeAll
+ public static void setupServers() throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ coordinatorConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
+
coordinatorConf.setString(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY.key(),
"BASIC");
+ coordinatorConf.setLong("rss.coordinator.app.expired", 2000);
+ coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
+ createCoordinatorServer(coordinatorConf);
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.setInteger("rss.server.netty.port", 0);
+ shuffleServerConf.setInteger("rss.random.port.min", 30000);
+ shuffleServerConf.setInteger("rss.random.port.max", 40000);
+ createShuffleServer(shuffleServerConf);
+ shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT +
1);
+ shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ }
+
+ @Test
+ public void startStreamServerOnRandomPort() throws Exception {
+ CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
+ Thread.sleep(5000);
+ int actualPort = shuffleServers.get(0).getNettyPort();
+ assertTrue(actualPort >= 30000 && actualPort < 40000);
+ actualPort = shuffleServers.get(1).getNettyPort();
+ assertTrue(actualPort >= 30000 && actualPort <= 40000);
+
+ int maxRetries = 100;
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.setInteger("rss.server.netty.port", actualPort);
+ shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
+ shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT +
2);
+ shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
+ ShuffleServer ss = new ShuffleServer(shuffleServerConf);
+ ss.start();
+ assertTrue(ss.getNettyPort() > actualPort && actualPort <= actualPort +
maxRetries);
+ ss.stopServer();
+ }
+
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index bde07d2b..0a1d2cd2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -129,7 +129,7 @@ public class ShuffleServer {
jettyServer.start();
server.start();
if (nettyServerEnabled) {
- streamServer.start();
+ nettyPort = streamServer.start();
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -195,7 +195,8 @@ public class ShuffleServer {
}
grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
nettyPort =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
- if (nettyPort > 0) {
+ if (nettyPort >= 0) {
+ // when nettyPort is zero,actual netty port will be changed,but id can't
be change.
id = ip + "-" + grpcPort + "-" + nettyPort;
} else {
id = ip + "-" + grpcPort;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index f4d26749..5f7b21e1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -400,7 +400,8 @@ public class ShuffleServerConf extends RssBaseConf {
public static final ConfigOption<Integer> NETTY_SERVER_PORT = ConfigOptions
.key("rss.server.netty.port")
.intType()
- .checkValue(ConfigUtils.POSITIVE_INTEGER_VALIDATOR_2, "netty port must
be positive")
+ .checkValue(ConfigUtils.SERVER_PORT_VALIDATOR, "check server port value
is 0 "
+ + "or value >= 1024 && value <= 65535")
.defaultValue(-1)
.withDescription("Shuffle netty server port");
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index 8423515b..57f91338 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server.netty;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -35,12 +36,15 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
+import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.netty.decoder.StreamServerInitDecoder;
-public class StreamServer {
+public class StreamServer implements ServerInterface {
private static final Logger LOG =
LoggerFactory.getLogger(StreamServer.class);
@@ -103,7 +107,20 @@ public class StreamServer {
return serverBootstrap;
}
- public void start() {
+ @Override
+ public int start() throws IOException {
+ int port =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
+ try {
+ port = RssUtils.startServiceOnPort(this,
+ Constants.NETTY_STREAM_SERVICE_NAME, port, shuffleServerConf);
+ } catch (Exception e) {
+ ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
+ }
+ return port;
+ }
+
+ @Override
+ public void startOnPort(int port) throws Exception {
Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
new StreamServerInitDecoder()
};
@@ -116,14 +133,13 @@ public class StreamServer {
// Bind the ports and save the results so that the channels can be closed
later.
// If the second bind fails, the first one gets cleaned up in the shutdown.
- int port =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
try {
channelFuture = serverBootstrap.bind(port);
channelFuture.syncUninterruptibly();
LOG.info("bind localAddress is " +
channelFuture.channel().localAddress());
LOG.info("Start stream server successfully with port " + port);
} catch (Exception e) {
- ExitUtils.terminate(1, "Fail to start stream server", e, LOG);
+ throw e;
}
}
@@ -139,4 +155,9 @@ public class StreamServer {
shuffleWorkerGroup = null;
}
}
+
+ @Override
+ public void blockUntilShutdown() throws InterruptedException {
+
+ }
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 0dc57338..7fe77fa2 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -138,6 +138,7 @@ public class ShuffleServerTest {
ExitUtils.disableSystemExit();
serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19997);
serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19996);
+ serverConf.set(ShuffleServerConf.SERVER_PORT_MAX_RETRIES, 1);
ShuffleServer ss2 = new ShuffleServer(serverConf);
String expectMessage = "Fail to start stream server";
final int expectStatus = 1;