This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4c475e90d1 [INLONG-11544][Manager] Optimize the configuration of the
Manager schedule module (#11545)
4c475e90d1 is described below
commit 4c475e90d1bdc4b74fbcd8e311d96fc677f1aa85
Author: Zkplo <[email protected]>
AuthorDate: Tue Nov 26 16:51:54 2024 +0800
[INLONG-11544][Manager] Optimize the configuration of the Manager schedule
module (#11545)
Co-authored-by: ZKpLo <[email protected]>
---
.../schedule/airflow/AirflowScheduleEngine.java | 4 +--
.../schedule/airflow/config/AirflowConfig.java | 34 ++++++++++++++++++---
.../dolphinscheduler/DolphinScheduleEngine.java | 16 ++++------
.../dolphinscheduler/DolphinScheduleOperator.java | 8 ++---
.../dolphinscheduler/DolphinScheduleUtils.java | 35 +++++++++++-----------
.../src/main/resources/application-dev.properties | 4 +--
.../src/main/resources/application-prod.properties | 4 +--
.../src/main/resources/application-test.properties | 4 +--
8 files changed, 65 insertions(+), 44 deletions(-)
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
index 792307e6ae..80d67f2281 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
@@ -97,8 +97,8 @@ public class AirflowScheduleEngine implements ScheduleEngine {
new AirflowConnectionGetter(airflowConfig.getConnectionId()));
if (!response.isSuccess()) {
AirflowConnection newConn = new
AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
- airflowConfig.getHost(),
airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
- airflowConfig.getPort(),
airflowConfig.getInlongPassword(), "");
+ airflowConfig.getInlongManagerHost(),
airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
+ airflowConfig.getInlongManagerPort(),
airflowConfig.getInlongPassword(), "");
response = serverClient.sendRequest(new
AirflowConnectionCreator(newConn));
LOGGER.info("AirflowConnection registration response: {}",
response.toString());
if (!response.isSuccess()) {
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
index 489712abe9..9e7ffbf9d0 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
@@ -27,10 +27,17 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import okhttp3.OkHttpClient;
+import org.eclipse.jetty.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import javax.annotation.PostConstruct;
+
+import java.net.URL;
+
@Data
@Configuration
@NoArgsConstructor
@@ -38,11 +45,12 @@ import org.springframework.context.annotation.Configuration;
@EqualsAndHashCode(callSuper = true)
public class AirflowConfig extends ClientConfiguration {
- @Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
- private String host;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AirflowConfig.class);
+ @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}")
+ private String inlongManagerUrl;
- @Value("${server.port:8083}")
- private int port;
+ private String inlongManagerHost;
+ private int inlongManagerPort;
@Value("${default.admin.user:admin}")
private String inlongUsername;
@@ -68,6 +76,23 @@ public class AirflowConfig extends ClientConfiguration {
@Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}")
private String baseUrl;
+ @PostConstruct
+ public void init() {
+ try {
+ if (StringUtil.isNotBlank(inlongManagerUrl)) {
+ URL url = new URL(inlongManagerUrl);
+ this.inlongManagerHost = url.getHost();
+ this.inlongManagerPort = url.getPort();
+ if (this.inlongManagerPort == -1) {
+ this.inlongManagerPort = 8083;
+ }
+ }
+ LOGGER.info("Init AirflowConfig success for manager url ={}",
this.inlongManagerUrl);
+ } catch (Exception e) {
+ LOGGER.error("Init AirflowConfig failed for manager url={}: ",
this.inlongManagerUrl, e);
+ }
+ }
+
@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
@@ -79,6 +104,7 @@ public class AirflowConfig extends ClientConfiguration {
.retryOnConnectionFailure(true)
.build();
}
+
@Bean
public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient,
AirflowConfig airflowConfig) {
return new AirflowServerClient(okHttpClient, airflowConfig);
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
index 5123068eab..c2d6ef0094 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
@@ -56,11 +56,8 @@ public class DolphinScheduleEngine implements ScheduleEngine
{
private static final Logger LOGGER =
LoggerFactory.getLogger(DolphinScheduleEngine.class);
- @Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
- private String host;
-
- @Value("${server.port:8083}")
- private int port;
+ @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}")
+ private String inlongManagerUrl;
@Value("${default.admin.user:admin}")
private String username;
@@ -86,10 +83,10 @@ public class DolphinScheduleEngine implements
ScheduleEngine {
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
}
- public DolphinScheduleEngine(String host, int port, String username,
String password, String dolphinUrl,
+ public DolphinScheduleEngine(String inlongManagerUrl, String username,
String password,
+ String dolphinUrl,
String token) {
- this.host = host;
- this.port = port;
+ this.inlongManagerUrl = inlongManagerUrl;
this.username = username;
this.password = password;
this.dolphinUrl = dolphinUrl;
@@ -161,8 +158,7 @@ public class DolphinScheduleEngine implements
ScheduleEngine {
long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
processDefCode =
dolphinScheduleOperator.createProcessDef(processDefUrl,
token, processName, processDesc, taskCode,
- host, port,
- username, password, offset,
scheduleInfo.getInlongGroupId());
+ inlongManagerUrl, username, password, offset,
scheduleInfo.getInlongGroupId());
LOGGER.info("Create process definition success, process definition
code: {}", processDefCode);
if (dolphinScheduleOperator.releaseProcessDef(processDefUrl,
processDefCode, token, DS_ONLINE_STATE)) {
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
index e317478c64..8a7d9cbe2b 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
@@ -92,11 +92,11 @@ public class DolphinScheduleOperator {
/**
* Creates a process definition in DolphinScheduler.
*/
- public long createProcessDef(String url, String token, String name, String
desc, long taskCode, String host,
- int port, String username, String password, long offset, String
groupId) {
+ public long createProcessDef(String url, String token, String name, String
desc, long taskCode,
+ String inlongManagerUrl, String username, String password, long
offset, String groupId) {
try {
- return DolphinScheduleUtils.createProcessDef(url, token, name,
desc, taskCode, host,
- port, username, password, offset, groupId);
+ return DolphinScheduleUtils.createProcessDef(url, token, name,
desc, taskCode, inlongManagerUrl, username,
+ password, offset, groupId);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in creating process definition: ",
e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
index 5fd6dd3629..ee28c6973f 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
@@ -282,22 +282,21 @@ public class DolphinScheduleUtils {
/**
* Creates a process definition in DolphinScheduler.
*
- * @param url The base URL of the DolphinScheduler API.
- * @param token The authentication token to be used in the request
header.
- * @param name The name of the process definition.
- * @param desc The description of the process definition.
- * @param taskCode The task code to be associated with this process
definition.
- * @param host The host where the process will run.
- * @param port The port where the process will run.
- * @param username The username for authentication.
- * @param password The password for authentication.
- * @param offset The offset for the scheduling.
- * @param groupId The group ID of the process.
+ * @param url The base URL of the DolphinScheduler API.
+ * @param token The authentication token to be used in the
request header.
+ * @param name The name of the process definition.
+ * @param desc The description of the process definition.
+ * @param taskCode The task code to be associated with this
process definition.
+ * @param inlongManagerUrl The host where the process will run.
+ * @param username The username for authentication.
+ * @param password The password for authentication.
+ * @param offset The offset for the scheduling.
+ * @param groupId The group ID of the process.
* @return The process definition code (ID) if creation is successful, or
0 if an error occurs.
*/
public static long createProcessDef(String url, String token, String name,
String desc,
- long taskCode, String host,
- int port, String username, String password, long offset, String
groupId) throws Exception {
+ long taskCode, String inlongManagerUrl, String username, String
password, long offset, String groupId)
+ throws Exception {
try {
Map<String, String> header = buildHeader(token);
@@ -306,7 +305,7 @@ public class DolphinScheduleUtils {
String taskRelationJson =
MAPPER.writeValueAsString(Collections.singletonList(taskRelation));
DSTaskParams taskParams = new DSTaskParams();
- taskParams.setRawScript(buildScript(host, port, username,
password, offset, groupId));
+ taskParams.setRawScript(buildScript(inlongManagerUrl, username,
password, offset, groupId));
DSTaskDefinition taskDefinition = new DSTaskDefinition();
taskDefinition.setCode(taskCode);
@@ -774,10 +773,10 @@ public class DolphinScheduleUtils {
* When process definition schedule run, the shell node run,
* Call back in inlong, sending a request with parameters required
*/
- private static String buildScript(String host, int port, String username,
String password, long offset,
+ private static String buildScript(String inlongManagerUrl, String
username, String password, long offset,
String groupId) {
- LOGGER.info("build script for host: {}, port: {}, username: {},
password: {}, offset: {}, groupId: {}", host,
- port, username, password, offset, groupId);
+ LOGGER.info("build script for Inlong Manager Url: {}, username: {},
password: {}, offset: {}, groupId: {}",
+ inlongManagerUrl, username, password, offset, groupId);
return "#!/bin/bash\n\n" +
// Get current timestamp
@@ -789,7 +788,7 @@ public class DolphinScheduleUtils {
// Set URL
"# Set URL and HTTP method\n" +
- "url=\"http://" + host + ":" + port + SHELL_REQUEST_API +
+ "url=\"" + inlongManagerUrl + SHELL_REQUEST_API +
"?username=" + username + "&password=" + password + "\"\n" +
"echo \"get url: ${url}\"\n" +
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 39900e1fdb..8a9ae13ccc 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -110,8 +110,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
-# Please confirm it is the actual address of manager
-schedule.engine.inlong.manager.host=
+# Inlong Manager URL accessible by the scheduler
+schedule.engine.inlong.manager.url=http://127.0.0.1:8083
# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 4de0f65d20..f5bde10caf 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -101,8 +101,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
-# Please confirm it is the actual address of manager
-schedule.engine.inlong.manager.host=
+# Inlong Manager URL accessible by the scheduler
+schedule.engine.inlong.manager.url=http://127.0.0.1:8083
# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 96e33e5fd4..905915df6b 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -102,8 +102,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
-# Please confirm it is the actual address of manager
-schedule.engine.inlong.manager.host=
+# Inlong Manager URL accessible by the scheduler
+schedule.engine.inlong.manager.url=http://127.0.0.1:8083
# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=