This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6685875e72 [Improvement](statistics)Mark unfinished analysis job
failed after master reboot
6685875e72 is described below
commit 6685875e72292505c2bcdfe639784c5e89d06ea2
Author: Jibing-Li <[email protected]>
AuthorDate: Thu Sep 21 23:11:50 2023 +0800
[Improvement](statistics)Mark unfinished analysis job failed after master
reboot
Before, the Pending/Running analysis tasks/jobs will stay in the unfinished
status for ever after FE reboot, which is misleading.
In this pr, jobs/tasks are only logged to editlog when they finish. So the
unfinished tasks/jobs are abandoned after reboot.
Also return without retry when analyze table cancelled by user in with sync
mode.
---
.../doris/analysis/ShowAnalyzeTaskStatus.java | 5 ++++
.../java/org/apache/doris/persist/EditLog.java | 13 ++++++--
.../apache/doris/statistics/AnalysisManager.java | 35 ++++++++++++++++------
.../apache/doris/statistics/BaseAnalysisTask.java | 3 ++
.../doris/statistics/AnalysisManagerTest.java | 2 +-
5 files changed, 46 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
index 927a56d19d..7c6c5cf17f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeTaskStatus.java
@@ -59,4 +59,9 @@ public class ShowAnalyzeTaskStatus extends ShowStmt {
public long getJobId() {
return jobId;
}
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.FORWARD_NO_SYNC;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 25d6bd089d..6543173f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -86,6 +86,7 @@ import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@@ -1070,11 +1071,19 @@ public class EditLog {
break;
}
case OperationType.OP_CREATE_ANALYSIS_JOB: {
-
env.getAnalysisManager().replayCreateAnalysisJob((AnalysisInfo)
journal.getData());
+ AnalysisInfo info = (AnalysisInfo) journal.getData();
+ if (AnalysisManager.needAbandon(info)) {
+ break;
+ }
+ env.getAnalysisManager().replayCreateAnalysisJob(info);
break;
}
case OperationType.OP_CREATE_ANALYSIS_TASK: {
-
env.getAnalysisManager().replayCreateAnalysisTask((AnalysisInfo)
journal.getData());
+ AnalysisInfo info = (AnalysisInfo) journal.getData();
+ if (AnalysisManager.needAbandon(info)) {
+ break;
+ }
+ env.getAnalysisManager().replayCreateAnalysisTask(info);
break;
}
case OperationType.OP_DELETE_ANALYSIS_JOB: {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 83c3cc84e4..ce0c546675 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -151,7 +151,7 @@ public class AnalysisManager extends Daemon implements
Writable {
// Set the job state to RUNNING when its first task becomes
RUNNING.
if (info.state.equals(AnalysisState.RUNNING) &&
job.state.equals(AnalysisState.PENDING)) {
job.state = AnalysisState.RUNNING;
- logCreateAnalysisJob(job);
+ replayCreateAnalysisJob(job);
}
boolean allFinished = true;
boolean hasFailure = false;
@@ -372,7 +372,7 @@ public class AnalysisManager extends Daemon implements
Writable {
updateTableStats(jobInfo);
return null;
}
- persistAnalysisJob(jobInfo);
+ recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
// TODO: maybe we should update table stats only when all task
succeeded.
updateTableStats(jobInfo);
@@ -554,13 +554,13 @@ public class AnalysisManager extends Daemon implements
Writable {
}
@VisibleForTesting
- public void persistAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
+ public void recordAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
if (jobInfo.scheduleType == ScheduleType.PERIOD &&
jobInfo.lastExecTimeInMs > 0) {
return;
}
AnalysisInfoBuilder jobInfoBuilder = new AnalysisInfoBuilder(jobInfo);
AnalysisInfo analysisInfo = jobInfoBuilder.setTaskId(-1).build();
- logCreateAnalysisJob(analysisInfo);
+ replayCreateAnalysisJob(analysisInfo);
}
public void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long,
BaseAnalysisTask> analysisTasks,
@@ -584,7 +584,7 @@ public class AnalysisManager extends Daemon implements
Writable {
}
try {
if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
- logCreateAnalysisTask(analysisInfo);
+ replayCreateAnalysisTask(analysisInfo);
}
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
@@ -623,7 +623,7 @@ public class AnalysisManager extends Daemon implements
Writable {
return;
}
try {
- logCreateAnalysisTask(analysisInfo);
+ replayCreateAnalysisTask(analysisInfo);
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
@@ -833,9 +833,8 @@ public class AnalysisManager extends Daemon implements
Writable {
executor.submit(() -> {
try {
if (cancelled) {
- errorMessages.add("Cancelled since query timeout,"
- + "you could set could query_timeout or
parallel_sync_analyze_task_num "
- + "to a bigger value and try again");
+ errorMessages.add("Query timeout or user
cancelled."
+ + "Could set analyze_timeout to a bigger
value.");
return;
}
try {
@@ -927,10 +926,28 @@ public class AnalysisManager extends Daemon implements
Writable {
int size = in.readInt();
for (int i = 0; i < size; i++) {
AnalysisInfo analysisInfo = AnalysisInfo.read(in);
+ // Unfinished manual once job/tasks doesn't need to keep in memory
anymore.
+ if (needAbandon(analysisInfo)) {
+ continue;
+ }
map.put(job ? analysisInfo.jobId : analysisInfo.taskId,
analysisInfo);
}
}
+ // Need to abandon the unfinished manual once jobs/tasks while loading
image and replay journal.
+ // Journal only store finished tasks and jobs.
+ public static boolean needAbandon(AnalysisInfo analysisInfo) {
+ if (analysisInfo == null) {
+ return true;
+ }
+ if ((AnalysisState.PENDING.equals(analysisInfo.state) ||
AnalysisState.RUNNING.equals(analysisInfo.state))
+ && ScheduleType.ONCE.equals(analysisInfo.scheduleType)
+ && JobType.MANUAL.equals(analysisInfo.jobType)) {
+ return true;
+ }
+ return false;
+ }
+
private static void readIdToTblStats(DataInput in, Map<Long, TableStats>
map) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index edadf4c17b..a563e6a6fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -170,6 +170,9 @@ public abstract class BaseAnalysisTask {
doExecute();
break;
} catch (Throwable t) {
+ if (killed) {
+ throw new RuntimeException(t);
+ }
LOG.warn("Failed to execute analysis task, retried times: {}",
retriedTimes++, t);
if (retriedTimes >
StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
throw new RuntimeException(t);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index 2146722db9..67d1a5dc44 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -261,7 +261,7 @@ public class AnalysisManagerTest {
analysisManager.buildAndAssignJob(analyzeTblStmt);
new Expectations() {
{
- analysisManager.persistAnalysisJob(analysisInfo);
+ analysisManager.recordAnalysisJob(analysisInfo);
times = 1;
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]