This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new af6c6c009 [INLONG-8021][Manager] Periodically delete sources with 
inconsistent states (#8028)
af6c6c009 is described below

commit af6c6c0095c145db1badb7e201f6ef55cbf4557f
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 16 12:22:34 2023 +0800

    [INLONG-8021][Manager] Periodically delete sources with inconsistent states 
(#8028)
---
 CHANGES.md                                         |  1 +
 .../dao/mapper/StreamSourceEntityMapper.java       |  6 +++
 .../resources/mappers/StreamSourceEntityMapper.xml | 11 ++++
 .../service/core/impl/AgentServiceImpl.java        | 62 ++++++++++++++--------
 .../src/main/resources/application-dev.properties  |  8 ++-
 .../src/main/resources/application-prod.properties |  4 ++
 .../src/main/resources/application-test.properties |  5 ++
 7 files changed, 74 insertions(+), 23 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 48c92680f..101f7f406 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -49,6 +49,7 @@
 ### Manager
 |                            ISSUE                            | Summary        
                                                                                
|
 
|:-----------------------------------------------------------:|:-----------------------------------------------------------------------------------------------|
+| [INLONG-8021](https://github.com/apache/inlong/issues/8021) | 
[Improve][Manager] Periodically delete sources with inconsistent states         
               |
 | [INLONG-8006](https://github.com/apache/inlong/issues/8006) | 
[Improve][Manager] Set displayname for the auto-registered cluster              
               |
 | [INLONG-7999](https://github.com/apache/inlong/issues/7999) | 
[Improve][Manager] Support PostgreSQL data node                                 
               |
 | [INLONG-7996](https://github.com/apache/inlong/issues/7996) | 
[Improve][Manager] Support issued kafka consumer group to sort                  
               |
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 6d8610ab3..fda794429 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -186,6 +186,12 @@ public interface StreamSourceEntityMapper {
      */
     void updateStatusToTimeout(@Param("beforeSeconds") Integer beforeSeconds);
 
+    /**
+     * Update the source status when it has been deleted
+     *
+     */
+    void updateStatusByDeleted();
+
     /**
      * Physical delete stream sources by group id and stream id
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index d29200b87..43d8045e0 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -552,6 +552,17 @@
         </where>
     </update>
 
+    <update id="updateStatusByDeleted">
+        update stream_source
+        <set>
+            previous_status = status,
+            status = 201,
+        </set>
+        <where>
+            is_deleted != 0
+            and status not in (99, 201, 301)
+        </where>
+    </update>
     <delete id="deleteByRelatedId">
         delete
         from stream_source
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 1faf01561..53ccea256 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -86,6 +86,9 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
@@ -115,9 +118,15 @@ public class AgentServiceImpl implements AgentService {
             new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
             new CallerRunsPolicy());
     @Value("${source.update.enabled:false}")
-    private Boolean enabled;
+    private Boolean updateTaskTimeoutEnabled;
     @Value("${source.update.before.seconds:60}")
     private Integer beforeSeconds;
+    @Value("${source.update.interval:60}")
+    private Integer updateTaskInterval;
+    @Value("${source.cleansing.enabled:false}")
+    private Boolean sourceCleanEnabled;
+    @Value("${source.cleansing.interval:600}")
+    private Integer cleanInterval;
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
@@ -138,11 +147,38 @@ public class AgentServiceImpl implements AgentService {
      */
     @PostConstruct
     private void startHeartbeatTask() {
-        if (enabled) {
-            UpdateTaskRunnable taskRunnable = new UpdateTaskRunnable();
-            this.executorService.execute(taskRunnable);
+        if (updateTaskTimeoutEnabled) {
+            ThreadFactory factory = new ThreadFactoryBuilder()
+                    .setNameFormat("scheduled-source-timeout-%d")
+                    .setDaemon(true)
+                    .build();
+            ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(factory);
+            executor.scheduleWithFixedDelay(() -> {
+                try {
+                    sourceMapper.updateStatusToTimeout(beforeSeconds);
+                    LOGGER.info("update task status successfully");
+                } catch (Throwable t) {
+                    LOGGER.error("update task status error", t);
+                }
+            }, 0, updateTaskInterval, TimeUnit.SECONDS);
             LOGGER.info("update task status started successfully");
         }
+        if (sourceCleanEnabled) {
+            ThreadFactory factory = new ThreadFactoryBuilder()
+                    .setNameFormat("scheduled-source-deleted-%d")
+                    .setDaemon(true)
+                    .build();
+            ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(factory);
+            executor.scheduleWithFixedDelay(() -> {
+                try {
+                    sourceMapper.updateStatusByDeleted();
+                    LOGGER.info("clean task successfully");
+                } catch (Throwable t) {
+                    LOGGER.error("clean task error", t);
+                }
+            }, 0, cleanInterval, TimeUnit.SECONDS);
+            LOGGER.info("clean task started successfully");
+        }
     }
 
     @Override
@@ -645,22 +681,4 @@ public class AgentServiceImpl implements AgentService {
         return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
     }
 
-    /**
-     * update task status when task timeout
-     */
-    private class UpdateTaskRunnable implements Runnable {
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    sourceMapper.updateStatusToTimeout(beforeSeconds);
-                    Thread.sleep(beforeSeconds * 1000);
-                } catch (Throwable t) {
-                    LOGGER.error("update task status runnable error", t);
-                }
-            }
-        }
-    }
-
 }
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 5bd94df1d..d705bf953 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -87,6 +87,12 @@ data.cleansing.batchSize=100
 sort.enable.zookeeper=false
 
 # If turned on, synchronizing change the source status when the agent 
heartbeat times out
-source.update.enabled=false
+source.update.enabled=true
 source.update.before.seconds=60
+source.update.interval=60
+
+# If turned on, tasks in the incorrect state are periodically deleted
+source.cleansing.enabled=true
+source.cleansing.interval=600
+
 
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index c07e79f04..e31b8f3c1 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -88,4 +88,8 @@ sort.enable.zookeeper=false
 # If turned on, synchronizing change the source status when the agent 
heartbeat times out
 source.update.enabled=false
 source.update.before.seconds=60
+source.update.interval=60
 
+# If turned on, tasks in the incorrect state are periodically deleted
+source.cleansing.enabled=false
+source.cleansing.interval=600
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 13822969a..3d2e7e3ed 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -89,3 +89,8 @@ sort.enable.zookeeper=false
 # If turned on, synchronizing change the source status when the agent 
heartbeat times out
 source.update.enabled=false
 source.update.before.seconds=60
+source.update.interval=60
+
+# If turned on, tasks in the incorrect state are periodically deleted
+source.cleansing.enabled=false
+source.cleansing.interval=600

Reply via email to