This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new af6c6c009 [INLONG-8021][Manager] Periodically delete sources with
inconsistent states (#8028)
af6c6c009 is described below
commit af6c6c0095c145db1badb7e201f6ef55cbf4557f
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 16 12:22:34 2023 +0800
[INLONG-8021][Manager] Periodically delete sources with inconsistent states
(#8028)
---
CHANGES.md | 1 +
.../dao/mapper/StreamSourceEntityMapper.java | 6 +++
.../resources/mappers/StreamSourceEntityMapper.xml | 11 ++++
.../service/core/impl/AgentServiceImpl.java | 62 ++++++++++++++--------
.../src/main/resources/application-dev.properties | 8 ++-
.../src/main/resources/application-prod.properties | 4 ++
.../src/main/resources/application-test.properties | 5 ++
7 files changed, 74 insertions(+), 23 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 48c92680f..101f7f406 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -49,6 +49,7 @@
### Manager
| ISSUE | Summary
|
|:-----------------------------------------------------------:|:-----------------------------------------------------------------------------------------------|
+| [INLONG-8021](https://github.com/apache/inlong/issues/8021) |
[Improve][Manager] Periodically delete sources with inconsistent states
|
| [INLONG-8006](https://github.com/apache/inlong/issues/8006) |
[Improve][Manager] Set displayname for the auto-registered cluster
|
| [INLONG-7999](https://github.com/apache/inlong/issues/7999) |
[Improve][Manager] Support PostgreSQL data node
|
| [INLONG-7996](https://github.com/apache/inlong/issues/7996) |
[Improve][Manager] Support issued kafka consumer group to sort
|
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 6d8610ab3..fda794429 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -186,6 +186,12 @@ public interface StreamSourceEntityMapper {
*/
void updateStatusToTimeout(@Param("beforeSeconds") Integer beforeSeconds);
+ /**
+ * Update the source status when it has been deleted
+ *
+ */
+ void updateStatusByDeleted();
+
/**
* Physical delete stream sources by group id and stream id
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index d29200b87..43d8045e0 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -552,6 +552,17 @@
</where>
</update>
+ <update id="updateStatusByDeleted">
+ update stream_source
+ <set>
+ previous_status = status,
+ status = 201,
+ </set>
+ <where>
+ is_deleted != 0
+ and status not in (99, 201, 301)
+ </where>
+ </update>
<delete id="deleteByRelatedId">
delete
from stream_source
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 1faf01561..53ccea256 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -86,6 +86,9 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
@@ -115,9 +118,15 @@ public class AgentServiceImpl implements AgentService {
new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
new CallerRunsPolicy());
@Value("${source.update.enabled:false}")
- private Boolean enabled;
+ private Boolean updateTaskTimeoutEnabled;
@Value("${source.update.before.seconds:60}")
private Integer beforeSeconds;
+ @Value("${source.update.interval:60}")
+ private Integer updateTaskInterval;
+ @Value("${source.cleansing.enabled:false}")
+ private Boolean sourceCleanEnabled;
+ @Value("${source.cleansing.interval:600}")
+ private Integer cleanInterval;
@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
@@ -138,11 +147,38 @@ public class AgentServiceImpl implements AgentService {
*/
@PostConstruct
private void startHeartbeatTask() {
- if (enabled) {
- UpdateTaskRunnable taskRunnable = new UpdateTaskRunnable();
- this.executorService.execute(taskRunnable);
+ if (updateTaskTimeoutEnabled) {
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("scheduled-source-timeout-%d")
+ .setDaemon(true)
+ .build();
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(factory);
+ executor.scheduleWithFixedDelay(() -> {
+ try {
+ sourceMapper.updateStatusToTimeout(beforeSeconds);
+ LOGGER.info("update task status successfully");
+ } catch (Throwable t) {
+ LOGGER.error("update task status error", t);
+ }
+ }, 0, updateTaskInterval, TimeUnit.SECONDS);
LOGGER.info("update task status started successfully");
}
+ if (sourceCleanEnabled) {
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("scheduled-source-deleted-%d")
+ .setDaemon(true)
+ .build();
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(factory);
+ executor.scheduleWithFixedDelay(() -> {
+ try {
+ sourceMapper.updateStatusByDeleted();
+ LOGGER.info("clean task successfully");
+ } catch (Throwable t) {
+ LOGGER.error("clean task error", t);
+ }
+ }, 0, cleanInterval, TimeUnit.SECONDS);
+ LOGGER.info("clean task started successfully");
+ }
}
@Override
@@ -645,22 +681,4 @@ public class AgentServiceImpl implements AgentService {
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
- /**
- * update task status when task timeout
- */
- private class UpdateTaskRunnable implements Runnable {
-
- @Override
- public void run() {
- while (true) {
- try {
- sourceMapper.updateStatusToTimeout(beforeSeconds);
- Thread.sleep(beforeSeconds * 1000);
- } catch (Throwable t) {
- LOGGER.error("update task status runnable error", t);
- }
- }
- }
- }
-
}
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 5bd94df1d..d705bf953 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -87,6 +87,12 @@ data.cleansing.batchSize=100
sort.enable.zookeeper=false
# If turned on, synchronizing change the source status when the agent
heartbeat times out
-source.update.enabled=false
+source.update.enabled=true
source.update.before.seconds=60
+source.update.interval=60
+
+# If turned on, tasks in the incorrect state are periodically deleted
+source.cleansing.enabled=true
+source.cleansing.interval=600
+
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index c07e79f04..e31b8f3c1 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -88,4 +88,8 @@ sort.enable.zookeeper=false
# If turned on, synchronizing change the source status when the agent
heartbeat times out
source.update.enabled=false
source.update.before.seconds=60
+source.update.interval=60
+# If turned on, tasks in the incorrect state are periodically deleted
+source.cleansing.enabled=false
+source.cleansing.interval=600
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 13822969a..3d2e7e3ed 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -89,3 +89,8 @@ sort.enable.zookeeper=false
# If turned on, synchronizing change the source status when the agent
heartbeat times out
source.update.enabled=false
source.update.before.seconds=60
+source.update.interval=60
+
+# If turned on, tasks in the incorrect state are periodically deleted
+source.cleansing.enabled=false
+source.cleansing.interval=600