This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6685875e72 [Improvement](statistics)Mark unfinished analysis job 
failed after master reboot
6685875e72 is described below

commit 6685875e72292505c2bcdfe639784c5e89d06ea2
Author: Jibing-Li <[email protected]>
AuthorDate: Thu Sep 21 23:11:50 2023 +0800

    [Improvement](statistics)Mark unfinished analysis job failed after master 
reboot
    
    Before, the Pending/Running analysis tasks/jobs will stay in the unfinished 
status for ever after FE reboot, which is misleading.
    In this pr, jobs/tasks are only logged to editlog when they finish. So the 
unfinished tasks/jobs are abandoned after reboot.
    Also return without retry when analyze table cancelled by user in with sync 
mode.
---
 .../doris/analysis/ShowAnalyzeTaskStatus.java      |  5 ++++
 .../java/org/apache/doris/persist/EditLog.java     | 13 ++++++--
 .../apache/doris/statistics/AnalysisManager.java   | 35 ++++++++++++++++------
 .../apache/doris/statistics/BaseAnalysisTask.java  |  3 ++
 .../doris/statistics/AnalysisManagerTest.java      |  2 +-
 5 files changed, 46 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
index 927a56d19d..7c6c5cf17f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
@@ -59,4 +59,9 @@ public class ShowAnalyzeTaskStatus extends ShowStmt {
     public long getJobId() {
         return jobId;
     }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_NO_SYNC;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 25d6bd089d..6543173f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -86,6 +86,7 @@ import org.apache.doris.resource.workloadgroup.WorkloadGroup;
 import org.apache.doris.scheduler.job.Job;
 import org.apache.doris.scheduler.job.JobTask;
 import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.statistics.TableStats;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
@@ -1070,11 +1071,19 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_CREATE_ANALYSIS_JOB: {
-                    
env.getAnalysisManager().replayCreateAnalysisJob((AnalysisInfo) 
journal.getData());
+                    AnalysisInfo info = (AnalysisInfo) journal.getData();
+                    if (AnalysisManager.needAbandon(info)) {
+                        break;
+                    }
+                    env.getAnalysisManager().replayCreateAnalysisJob(info);
                     break;
                 }
                 case OperationType.OP_CREATE_ANALYSIS_TASK: {
-                    
env.getAnalysisManager().replayCreateAnalysisTask((AnalysisInfo) 
journal.getData());
+                    AnalysisInfo info = (AnalysisInfo) journal.getData();
+                    if (AnalysisManager.needAbandon(info)) {
+                        break;
+                    }
+                    env.getAnalysisManager().replayCreateAnalysisTask(info);
                     break;
                 }
                 case OperationType.OP_DELETE_ANALYSIS_JOB: {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 83c3cc84e4..ce0c546675 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -151,7 +151,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             // Set the job state to RUNNING when its first task becomes 
RUNNING.
             if (info.state.equals(AnalysisState.RUNNING) && 
job.state.equals(AnalysisState.PENDING)) {
                 job.state = AnalysisState.RUNNING;
-                logCreateAnalysisJob(job);
+                replayCreateAnalysisJob(job);
             }
             boolean allFinished = true;
             boolean hasFailure = false;
@@ -372,7 +372,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             updateTableStats(jobInfo);
             return null;
         }
-        persistAnalysisJob(jobInfo);
+        recordAnalysisJob(jobInfo);
         analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
         // TODO: maybe we should update table stats only when all task 
succeeded.
         updateTableStats(jobInfo);
@@ -554,13 +554,13 @@ public class AnalysisManager extends Daemon implements 
Writable {
     }
 
     @VisibleForTesting
-    public void persistAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
+    public void recordAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
         if (jobInfo.scheduleType == ScheduleType.PERIOD && 
jobInfo.lastExecTimeInMs > 0) {
             return;
         }
         AnalysisInfoBuilder jobInfoBuilder = new AnalysisInfoBuilder(jobInfo);
         AnalysisInfo analysisInfo = jobInfoBuilder.setTaskId(-1).build();
-        logCreateAnalysisJob(analysisInfo);
+        replayCreateAnalysisJob(analysisInfo);
     }
 
     public void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, 
BaseAnalysisTask> analysisTasks,
@@ -584,7 +584,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             }
             try {
                 if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
-                    logCreateAnalysisTask(analysisInfo);
+                    replayCreateAnalysisTask(analysisInfo);
                 }
             } catch (Exception e) {
                 throw new DdlException("Failed to create analysis task", e);
@@ -623,7 +623,7 @@ public class AnalysisManager extends Daemon implements 
Writable {
             return;
         }
         try {
-            logCreateAnalysisTask(analysisInfo);
+            replayCreateAnalysisTask(analysisInfo);
         } catch (Exception e) {
             throw new DdlException("Failed to create analysis task", e);
         }
@@ -833,9 +833,8 @@ public class AnalysisManager extends Daemon implements 
Writable {
                 executor.submit(() -> {
                     try {
                         if (cancelled) {
-                            errorMessages.add("Cancelled since query timeout,"
-                                    + "you could set could query_timeout or 
parallel_sync_analyze_task_num "
-                                    + "to a bigger value and try again");
+                            errorMessages.add("Query timeout or user 
cancelled."
+                                    + "Could set analyze_timeout to a bigger 
value.");
                             return;
                         }
                         try {
@@ -927,10 +926,28 @@ public class AnalysisManager extends Daemon implements 
Writable {
         int size = in.readInt();
         for (int i = 0; i < size; i++) {
             AnalysisInfo analysisInfo = AnalysisInfo.read(in);
+            // Unfinished manual once job/tasks doesn't need to keep in memory 
anymore.
+            if (needAbandon(analysisInfo)) {
+                continue;
+            }
             map.put(job ? analysisInfo.jobId : analysisInfo.taskId, 
analysisInfo);
         }
     }
 
+    // Need to abandon the unfinished manual once jobs/tasks while loading 
image and replay journal.
+    // Journal only store finished tasks and jobs.
+    public static boolean needAbandon(AnalysisInfo analysisInfo) {
+        if (analysisInfo == null) {
+            return true;
+        }
+        if ((AnalysisState.PENDING.equals(analysisInfo.state) || 
AnalysisState.RUNNING.equals(analysisInfo.state))
+                && ScheduleType.ONCE.equals(analysisInfo.scheduleType)
+                && JobType.MANUAL.equals(analysisInfo.jobType)) {
+            return true;
+        }
+        return false;
+    }
+
     private static void readIdToTblStats(DataInput in, Map<Long, TableStats> 
map) throws IOException {
         int size = in.readInt();
         for (int i = 0; i < size; i++) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index edadf4c17b..a563e6a6fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -170,6 +170,9 @@ public abstract class BaseAnalysisTask {
                 doExecute();
                 break;
             } catch (Throwable t) {
+                if (killed) {
+                    throw new RuntimeException(t);
+                }
                 LOG.warn("Failed to execute analysis task, retried times: {}", 
retriedTimes++, t);
                 if (retriedTimes > 
StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
                     throw new RuntimeException(t);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index 2146722db9..67d1a5dc44 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -261,7 +261,7 @@ public class AnalysisManagerTest {
         analysisManager.buildAndAssignJob(analyzeTblStmt);
         new Expectations() {
             {
-                analysisManager.persistAnalysisJob(analysisInfo);
+                analysisManager.recordAnalysisJob(analysisInfo);
                 times = 1;
             }
         };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to