This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7a7e66d28 [Feature] Add application backup clean task (#3160)
7a7e66d28 is described below

commit 7a7e66d28925587beccbbcebf96ddb18bc20534d
Author: gongzhongqiang <[email protected]>
AuthorDate: Fri Sep 22 22:13:02 2023 +0800

    [Feature] Add application backup clean task (#3160)
---
 .../core/service/ApplicationBackUpService.java     |  2 -
 .../service/impl/ApplicationBackUpServiceImpl.java | 23 ++------
 .../core/task/ApplicationBackUpCleanTask.java      | 67 ++++++++++++++++++++++
 .../console/core/task/FlinkAppLostWatcher.java     |  2 +-
 .../src/main/resources/application.yml             |  6 ++
 .../streampark/gateway/session/SessionHandle.java  |  2 +-
 6 files changed, 80 insertions(+), 22 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
index 8966826e0..6f441e2f1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java
@@ -41,6 +41,4 @@ public interface ApplicationBackUpService extends 
IService<ApplicationBackUp> {
   void removeApp(Application appParam);
 
   void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam);
-
-  boolean isFlinkSqlBacked(Long appId, Long sqlId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
index f6b6337b3..6afe5b45e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationBackUpServiceImpl.java
@@ -165,7 +165,11 @@ public class ApplicationBackUpServiceImpl
 
   @Override
   public void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam) {
-    ApplicationBackUp backUp = getFlinkSqlBackup(appParam.getId(), 
flinkSqlParam.getId());
+    LambdaQueryWrapper<ApplicationBackUp> queryWrapper =
+        new LambdaQueryWrapper<ApplicationBackUp>()
+            .eq(ApplicationBackUp::getAppId, appParam.getId())
+            .eq(ApplicationBackUp::getSqlId, flinkSqlParam.getId());
+    ApplicationBackUp backUp = baseMapper.selectOne(queryWrapper);
     ApiAlertException.throwIfNull(
         backUp, "Application backup can't be null. Rollback flink sql 
failed.");
     // rollback config and sql
@@ -173,23 +177,6 @@ public class ApplicationBackUpServiceImpl
     effectiveService.saveOrUpdate(backUp.getAppId(), EffectiveType.FLINKSQL, 
backUp.getSqlId());
   }
 
-  @Override
-  public boolean isFlinkSqlBacked(Long appId, Long sqlId) {
-    LambdaQueryWrapper<ApplicationBackUp> queryWrapper =
-        new LambdaQueryWrapper<ApplicationBackUp>()
-            .eq(ApplicationBackUp::getAppId, appId)
-            .eq(ApplicationBackUp::getSqlId, sqlId);
-    return baseMapper.selectCount(queryWrapper) > 0;
-  }
-
-  private ApplicationBackUp getFlinkSqlBackup(Long appId, Long sqlId) {
-    LambdaQueryWrapper<ApplicationBackUp> queryWrapper =
-        new LambdaQueryWrapper<ApplicationBackUp>()
-            .eq(ApplicationBackUp::getAppId, appId)
-            .eq(ApplicationBackUp::getSqlId, sqlId);
-    return baseMapper.selectOne(queryWrapper);
-  }
-
   @Override
   public Boolean delete(Long id) throws InternalException {
     ApplicationBackUp backUp = getById(id);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
new file mode 100644
index 000000000..e1a7056b2
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.console.core.entity.ApplicationBackUp;
+import org.apache.streampark.console.core.service.ApplicationBackUpService;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class ApplicationBackUpCleanTask {
+
+  private final ApplicationBackUpService backUpService;
+
+  @Value("${streampark.backup-clean.max-backup-num:5}")
+  public Integer maxBackupNum;
+
+  @Scheduled(cron = "${streampark.backup-clean.exec-cron:0 0 1 * * ?}")
+  public void backUpClean() {
+    log.info("Start to clean application backup");
+    // select all application backup which count > maxBackupNum group by app_id
+    backUpService.lambdaQuery().groupBy(ApplicationBackUp::getAppId)
+        .having("count(*) > " + maxBackupNum).list().stream()
+        .map(ApplicationBackUp::getAppId)
+        .forEach(
+            appId -> {
+              // order by create_time desc and skip first maxBackupNum records 
and delete others
+              backUpService.lambdaQuery().eq(ApplicationBackUp::getAppId, 
appId)
+                  
.orderByDesc(ApplicationBackUp::getCreateTime).list().stream()
+                  .skip(maxBackupNum)
+                  .forEach(
+                      backUp -> {
+                        try {
+                          backUpService.delete(backUp.getId());
+                        } catch (Exception e) {
+                          log.error(
+                              "Clean application backup failed for app id: {} 
, backup id: {}",
+                              appId,
+                              backUp.getId(),
+                              e);
+                        }
+                      });
+            });
+    log.info("Clean application backup finished");
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
index d9f7381fd..ebb3ab76c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppLostWatcher.java
@@ -65,7 +65,7 @@ public class FlinkAppLostWatcher {
 
   private long lastWatchTime = 0L;
 
-  private AtomicBoolean isProbing = new AtomicBoolean(false);
+  private final AtomicBoolean isProbing = new AtomicBoolean(false);
 
   private Short retryAttempts = PROBE_RETRY_COUNT;
 
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index 6bf9b23dc..e2e34e6ed 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -131,6 +131,12 @@ streampark:
     max-resource-expired-hours: 120
     # gc task running interval hours
     exec-cron: 0 0 0/6 * * ?
+  # application backup clean configuration
+  backup-clean:
+    # maximum retention number of backup
+    max-backup-num: 5
+    # default running once a day
+    exec-cron: 0 0 1 * * ?
 
   shiro:
     # token timeout, unit second
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
index d530d68c3..663261ab1 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
@@ -51,6 +51,6 @@ public class SessionHandle {
 
   @Override
   public String toString() {
-    return identifier.toString();
+    return identifier;
   }
 }

Reply via email to