This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 23cc78bb15 [Improve][Zeta] Jetty server enable dynamic port (#7903)
23cc78bb15 is described below
commit 23cc78bb157071dd28f313316f24e2d2df481352
Author: corgy-w <[email protected]>
AuthorDate: Sun Oct 27 18:09:50 2024 +0800
[Improve][Zeta] Jetty server enable dynamic port (#7903)
---
config/seatunnel.yaml | 1 +
docs/en/seatunnel-engine/rest-api-v2.md | 7 +++--
docs/zh/seatunnel-engine/rest-api-v2.md | 7 ++++-
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 3 +-
.../src/test/resources/cluster/seatunnel.yaml | 1 +
.../config/YamlSeaTunnelDomConfigProcessor.java | 6 ++++
.../engine/common/config/server/HttpConfig.java | 4 +++
.../common/config/server/ServerConfigOptions.java | 14 +++++++++
.../config/YamlSeaTunnelConfigParserTest.java | 7 +++++
.../src/test/resources/seatunnel.yaml | 9 +++++-
.../seatunnel/engine/server/JettyService.java | 35 +++++++++++++++++++++-
11 files changed, 88 insertions(+), 6 deletions(-)
diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index c383239423..4e7ef0c700 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -40,3 +40,4 @@ seatunnel:
http:
enable-http: true
port: 8080
+ enable-dynamic-port: false
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index acd8bd636d..2c642dd8fb 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -10,8 +10,9 @@ completed jobs. The monitoring API is a RESTful API that
accepts HTTP requests a
## Overview
The v2 version of the api uses jetty support. It is the same as the interface
specification of v1 version
-, you can specify the port and context-path by modifying the configuration
items in `seatunnel.yaml`
-
+, you can specify the port and context-path by modifying the configuration
items in `seatunnel.yaml`,
+you can configure `enable-dynamic-port` to enable dynamic ports (the default
port is accumulated starting from `port`), and the default is closed,
+If enable-dynamic-port is true, We will use the unused port in the range
within the range of `port` and `port` + `port-range`, default range is 100
```yaml
@@ -20,6 +21,8 @@ seatunnel:
http:
enable-http: true
port: 8080
+ enable-dynamic-port: false
+ port-range: 100
```
Context-path can also be configured as follows:
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 4964415cb2..0ec9741d40 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -8,7 +8,10 @@ SeaTunnel有一个用于监控的API,可用于查询运行作业的状态和
## 概述
-v2版本的api使用jetty支持,与v1版本的接口规范相同 ,可以通过修改`seatunnel.yaml`中的配置项来指定端口和context-path
+v2版本的api使用jetty支持,与v1版本的接口规范相同 ,可以通过修改`seatunnel.yaml`中的配置项来指定端口和context-path,
+同时可以配置 `enable-dynamic-port` 开启动态端口(默认从 `port` 开始累加),默认为关闭,
+如果`enable-dynamic-port`为`true`,我们将使用`port`和`port`+`port-range`范围内未使用的端口,默认范围是100。
+
```yaml
seatunnel:
@@ -16,6 +19,8 @@ seatunnel:
http:
enable-http: true
port: 8080
+ enable-dynamic-port: false
+ port-range: 100
```
同时也可以配置context-path,配置如下:
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index e0070ba2b1..4dd4a11b31 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -105,7 +105,8 @@ public class RestApiIT {
node2Tags.setAttribute("node", "node2");
Config node2hzconfig =
node1Config.getHazelcastConfig().setMemberAttributeConfig(node2Tags);
node2Config = ConfigProvider.locateAndGetSeaTunnelConfig();
- node2Config.getEngineConfig().getHttpConfig().setPort(8081);
+ // Dynamically generated port
+
node2Config.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
node2Config.getEngineConfig().getHttpConfig().setEnabled(true);
node2Config.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
node2Config.getEngineConfig().getSlotServiceConfig().setSlotNum(20);
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
index 0720832a67..5878531b2e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
@@ -34,3 +34,4 @@ seatunnel:
http:
enable-http: true
port: 8080
+ enable-dynamic-port: false
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index eefde7f8b8..ffa984de1f 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -368,6 +368,12 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
httpConfig.setContextPath(getTextContent(node));
} else if (ServerConfigOptions.ENABLE_HTTP.key().equals(name)) {
httpConfig.setEnabled(getBooleanValue(getTextContent(node)));
+ } else if
(ServerConfigOptions.ENABLE_DYNAMIC_PORT.key().equals(name)) {
+
httpConfig.setEnableDynamicPort(getBooleanValue(getTextContent(node)));
+ } else if (ServerConfigOptions.PORT_RANGE.key().equals(name)) {
+ httpConfig.setPortRange(
+ getIntegerValue(
+ ServerConfigOptions.PORT_RANGE.key(),
getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
index d9594202a4..d0227263a5 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
@@ -32,6 +32,10 @@ public class HttpConfig implements Serializable {
private String contextPath =
ServerConfigOptions.CONTEXT_PATH.defaultValue();
+ private boolean enableDynamicPort =
ServerConfigOptions.ENABLE_DYNAMIC_PORT.defaultValue();
+
+ private int portRange = ServerConfigOptions.PORT_RANGE.defaultValue();
+
public void setPort(int port) {
checkPositive(port, ServerConfigOptions.HTTP + " must be > 0");
this.port = port;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index e74ff59978..37ec8674a1 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -247,6 +247,20 @@ public class ServerConfigOptions {
.defaultValue("")
.withDescription("The context path of the http server.");
+ public static final Option<Boolean> ENABLE_DYNAMIC_PORT =
+ Options.key("enable-dynamic-port")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable the dynamic port of the http
server. If true, We will use the unused port");
+
+ public static final Option<Integer> PORT_RANGE =
+ Options.key("port-range")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "The port range of the http server. If
enable-dynamic-port is true, We will use the unused port in the range");
+
public static final Option<HttpConfig> HTTP =
Options.key("http")
.type(new TypeReference<HttpConfig>() {})
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index 3fe398cafe..42e6aa681b 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -69,6 +69,13 @@ public class YamlSeaTunnelConfigParserTest {
.getStorage()
.getStoragePluginConfig()
.get("fs.defaultFS"));
+
+ Assertions.assertFalse(
+
config.getEngineConfig().getTelemetryConfig().getMetric().isEnabled());
+
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnabled());
+
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort());
+ Assertions.assertEquals(8080,
config.getEngineConfig().getHttpConfig().getPort());
+ Assertions.assertEquals(200,
config.getEngineConfig().getHttpConfig().getPortRange());
}
@Test
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 8453bdeeca..88f8c3f9bb 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -32,4 +32,11 @@ seatunnel:
namespace: /tmp/seatunnel/checkpoint_snapshot
storage.type: hdfs
fs.defaultFS: file:/// # Ensure that the directory has
written permission
-
+ telemetry:
+ metric:
+ enabled: false
+ http:
+ enable-http: true
+ port: 8080
+ enable-dynamic-port: true
+ port-range: 200
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index ea14c729f2..4e72915be3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -47,6 +47,9 @@ import lombok.extern.slf4j.Slf4j;
import javax.servlet.DispatcherType;
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
import java.net.URL;
import java.util.EnumSet;
@@ -71,6 +74,7 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.UPDATE_TAGS_U
/** The Jetty service for SeaTunnel engine server. */
@Slf4j
public class JettyService {
+
private NodeEngineImpl nodeEngine;
private SeaTunnelConfig seaTunnelConfig;
Server server;
@@ -78,7 +82,14 @@ public class JettyService {
public JettyService(NodeEngineImpl nodeEngine, SeaTunnelConfig
seaTunnelConfig) {
this.nodeEngine = nodeEngine;
this.seaTunnelConfig = seaTunnelConfig;
- this.server = new
Server(seaTunnelConfig.getEngineConfig().getHttpConfig().getPort());
+ int port = seaTunnelConfig.getEngineConfig().getHttpConfig().getPort();
+ if
(seaTunnelConfig.getEngineConfig().getHttpConfig().isEnableDynamicPort()) {
+ port =
+ chooseAppropriatePort(
+ port,
seaTunnelConfig.getEngineConfig().getHttpConfig().getPortRange());
+ }
+ log.info("SeaTunnel REST service will start on port {}", port);
+ this.server = new Server(port);
}
public void createJettyServer() {
@@ -166,4 +177,26 @@ public class JettyService {
private static String convertUrlToPath(String url) {
return url + "/*";
}
+
+ public int chooseAppropriatePort(int initialPort, int portRange) {
+ int port = initialPort;
+
+ while (port <= initialPort + portRange) {
+ if (!isPortInUse(port)) {
+ return port;
+ }
+ port++;
+ }
+
+ throw new RuntimeException("Jetty failed to start, No available port
found in the range!");
+ }
+
+ private boolean isPortInUse(int port) {
+ try (ServerSocket ss = new ServerSocket(port);
+ DatagramSocket ds = new DatagramSocket(port)) {
+ return false;
+ } catch (IOException e) {
+ return true;
+ }
+ }
}