This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ace3362d6f [INLONG-11531][Manager] Fix bug in DolphinScheduler engine
(#11532)
ace3362d6f is described below
commit ace3362d6f94fcc3b913d935791b4e66f06f6d65
Author: emptyOVO <[email protected]>
AuthorDate: Mon Nov 25 18:48:32 2024 +0800
[INLONG-11531][Manager] Fix bug in DolphinScheduler engine (#11532)
---
.../dolphinscheduler/DolphinScheduleConstants.java | 3 ++
.../dolphinscheduler/DolphinScheduleEngine.java | 2 ++
.../dolphinscheduler/DolphinScheduleUtils.java | 40 +++++++++++++++++++---
.../exception/DolphinScheduleException.java | 1 +
.../DolphinScheduleEngineTest.java | 1 -
5 files changed, 42 insertions(+), 5 deletions(-)
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
index 89dcda5b77..1488ca1fe8 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
@@ -22,6 +22,7 @@ public class DolphinScheduleConstants {
// DS public constants
public static final String DS_ID = "id";
public static final String DS_CODE = "code";
+ public static final String DS_SUCCESS = "success";
public static final String DS_TOKEN = "token";
public static final String DS_PAGE_SIZE = "pageSize";
public static final String DS_PAGE_NO = "pageNo";
@@ -29,6 +30,8 @@ public class DolphinScheduleConstants {
public static final String DS_RESPONSE_DATA = "data";
public static final String DS_RESPONSE_NAME = "name";
public static final String DS_RESPONSE_TOTAL_LIST = "totalList";
+ public static final int DS_DEFAULT_RETRY_TIMES = 3;
+ public static final int DS_DEFAULT_WAIT_MILLS = 1000;
public static final String DS_DEFAULT_PAGE_SIZE = "10";
public static final String DS_DEFAULT_PAGE_NO = "1";
public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai";
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
index dd0c6d0c81..5123068eab 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
@@ -132,6 +132,7 @@ public class DolphinScheduleEngine implements
ScheduleEngine {
@Override
@VisibleForTesting
public boolean handleRegister(ScheduleInfo scheduleInfo) {
+ start();
String processDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL;
String scheduleUrl = projectBaseUrl + "/" + projectCode +
DS_SCHEDULE_URL;
String processName = scheduleInfo.getInlongGroupId() +
DS_DEFAULT_PROCESS_NAME;
@@ -191,6 +192,7 @@ public class DolphinScheduleEngine implements
ScheduleEngine {
@Override
@VisibleForTesting
public boolean handleUnregister(String groupId) {
+ start();
String processName = groupId + DS_DEFAULT_PROCESS_NAME;
String processDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL;
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
index 87cb1c5127..5fd6dd3629 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
@@ -57,11 +57,13 @@ import java.util.stream.StreamSupport;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_RETRY_TIMES;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_WAIT_MILLS;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO;
@@ -78,6 +80,7 @@ import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedul
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SUCCESS;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM;
import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION;
@@ -89,6 +92,7 @@ import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleExcept
import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR;
import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR;
import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_IN_USED_ERROR;
import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED;
import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED;
import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED;
@@ -489,20 +493,48 @@ public class DolphinScheduleUtils {
Map<String, String> header = buildHeader(token);
String requestUrl = url + "/" + code;
+ for (int retryTime = 1; retryTime <= DS_DEFAULT_RETRY_TIMES;
retryTime++) {
+ JsonObject response = executeHttpRequest(requestUrl, DELETE,
new HashMap<>(), header);
+ if (response.get(DS_CODE).getAsInt() ==
PROCESS_DEFINITION_IN_USED_ERROR) {
+
+ LOGGER.warn(
+ "Retrying for current retry time ={}, maximum
retry count={}, code={}, url={}, after {} ms...",
+ retryTime, DS_DEFAULT_RETRY_TIMES, code, url,
DS_DEFAULT_WAIT_MILLS);
+ Thread.sleep(DS_DEFAULT_WAIT_MILLS);
+
+ } else if (response.get(DS_SUCCESS).getAsBoolean()) {
+ LOGGER.info("Delete process or project success, response
data: {}", response);
+ return;
+ } else {
+ LOGGER.warn("Delete process or project failed, response
data: {}", response);
+ }
- JsonObject response = executeHttpRequest(requestUrl, DELETE, new
HashMap<>(), header);
- LOGGER.info("delete process or project success, response data:
{}", response);
+ }
+ LOGGER.error(
+ "Maximum retry attempts reached for deleting process or
project. URL: {}, Code: {}",
+ url, code);
+ throw new DolphinScheduleException(
+ DELETION_FAILED,
+ String.format("Failed to delete after %d retries. Code: %d
at URL: %s",
+ DS_DEFAULT_RETRY_TIMES, code, url));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Thread interrupted while retrying delete process or
project: ", e);
+ throw new DolphinScheduleException(
+ DELETION_FAILED,
+ String.format("Thread interrupted while retrying delete
for code: %d at URL: %s", code, url));
} catch (JsonParseException e) {
LOGGER.error("JsonParseException during deleting process or
project", e);
throw new DolphinScheduleException(
JSON_PARSE_ERROR,
- String.format("Error deleting process or project with
code: %d at URL: %s", code, url), e);
+ String.format("Error deleting process or project with
code: %d at URL: %s", code, url));
} catch (DolphinScheduleException e) {
+ LOGGER.error("Error deleting process or project for code={},
url={} ", code, url, e);
throw new DolphinScheduleException(
DELETION_FAILED,
- String.format("Error deleting process or project with
code: %d at URL: %s", code, url), e);
+ String.format("Error deleting process or project with
code: %d at URL: %s", code, url));
}
}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
index 348697b672..b5238a3a35 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
@@ -42,6 +42,7 @@ public class DolphinScheduleException extends
RuntimeException {
public static final String GEN_TASK_CODE_FAILED = "GEN_TASK_CODE_FAILED";
// Process-related error codes
+ public static final int PROCESS_DEFINITION_IN_USED_ERROR = 10163;
public static final String PROCESS_DEFINITION_QUERY_FAILED =
"PROCESS_DEFINITION_QUERY_FAILED";
public static final String PROCESS_DEFINITION_CREATION_FAILED =
"PROCESS_DEFINITION_CREATION_FAILED";
public static final String PROCESS_DEFINITION_RELEASE_FAILED =
"PROCESS_DEFINITION_RELEASE_FAILED";
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
index f95a5268ee..b63b04e736 100644
---
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
@@ -53,7 +53,6 @@ public class DolphinScheduleEngineTest extends
DolphinScheduleContainerTestEnv {
String token = accessToken();
dolphinScheduleEngine.setToken(token);
- dolphinScheduleEngine.start();
}
@AfterAll