This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 02fb899bf32 [improvement](statistic)Improve auto analyze visibility.
(#29046) (#29294)
02fb899bf32 is described below
commit 02fb899bf32510b7aa94eb340e4861cfe237ad16
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Dec 29 23:23:12 2023 +0800
[improvement](statistic)Improve auto analyze visibility. (#29046) (#29294)
---
.../java/org/apache/doris/qe/ShowExecutor.java | 12 +-
.../apache/doris/statistics/AnalysisManager.java | 228 ++++++---------------
.../doris/statistics/StatisticsAutoCollector.java | 22 +-
.../doris/statistics/StatisticsCollector.java | 4 -
.../doris/statistics/AnalysisManagerTest.java | 114 +----------
5 files changed, 88 insertions(+), 292 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index adb344cd897..8ebd4a7bfa7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2618,8 +2618,7 @@ public class ShowExecutor {
private void handleShowAnalyze() {
ShowAnalyzeStmt showStmt = (ShowAnalyzeStmt) stmt;
- List<AnalysisInfo> results = Env.getCurrentEnv().getAnalysisManager()
- .showAnalysisJob(showStmt);
+ List<AnalysisInfo> results =
Env.getCurrentEnv().getAnalysisManager().showAnalysisJob(showStmt);
List<List<String>> resultRows = Lists.newArrayList();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
for (AnalysisInfo analysisInfo : results) {
@@ -2645,14 +2644,7 @@ public class ShowExecutor {
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
ZoneId.systemDefault())));
row.add(analysisInfo.state.toString());
- try {
- row.add(showStmt.isAuto()
- ? analysisInfo.progress
- :
Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
- } catch (Exception e) {
- row.add("N/A");
- LOG.warn("Failed to get progress for job: {}",
analysisInfo, e);
- }
+
row.add(Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
row.add(analysisInfo.scheduleType.toString());
LocalDateTime startTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.startTime),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index ac4a83a6b28..63ae94e37d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -78,7 +78,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -96,8 +95,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
public class AnalysisManager implements Writable {
@@ -119,9 +116,6 @@ public class AnalysisManager implements Writable {
protected final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());
- // Tracking system submitted job, keep in mem only
- protected final Map<Long, AnalysisInfo> systemJobInfoMap = new
ConcurrentHashMap<>();
-
// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection>
ctxToSyncTask = new ConcurrentHashMap<>();
@@ -129,123 +123,11 @@ public class AnalysisManager implements Writable {
private final Map<Long, AnalysisJob> idToAnalysisJob = new
ConcurrentHashMap<>();
+ // To be deprecated, keep it for meta compatibility now, will remove later.
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null,
this);
- private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w
-> {
- AnalysisInfo info = w.info;
- AnalysisState taskState = w.taskState;
- String message = w.message;
- long time = w.time;
- if (analysisJobIdToTaskMap.get(info.jobId) == null) {
- return null;
- }
- info.state = taskState;
- info.message = message;
- // Update the task cost time when task finished or failed. And only
log the final state.
- if (taskState.equals(AnalysisState.FINISHED) ||
taskState.equals(AnalysisState.FAILED)) {
- info.timeCostInMs = time - info.lastExecTimeInMs;
- info.lastExecTimeInMs = time;
- logCreateAnalysisTask(info);
- }
- info.lastExecTimeInMs = time;
- AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
- // Job may get deleted during execution.
- if (job == null) {
- return null;
- }
- // Synchronize the job state change in job level.
- synchronized (job) {
- job.lastExecTimeInMs = time;
- // Set the job state to RUNNING when its first task becomes
RUNNING.
- if (info.state.equals(AnalysisState.RUNNING) &&
job.state.equals(AnalysisState.PENDING)) {
- job.state = AnalysisState.RUNNING;
- job.markStartTime(System.currentTimeMillis());
- replayCreateAnalysisJob(job);
- }
- boolean allFinished = true;
- boolean hasFailure = false;
- for (BaseAnalysisTask task :
analysisJobIdToTaskMap.get(info.jobId).values()) {
- AnalysisInfo taskInfo = task.info;
- if (taskInfo.state.equals(AnalysisState.RUNNING) ||
taskInfo.state.equals(AnalysisState.PENDING)) {
- allFinished = false;
- break;
- }
- if (taskInfo.state.equals(AnalysisState.FAILED)) {
- hasFailure = true;
- }
- }
- if (allFinished) {
- if (hasFailure) {
- job.markFailed();
- } else {
- job.markFinished();
- try {
- updateTableStats(job);
- } catch (Throwable e) {
- LOG.warn("Failed to update Table statistics in job:
{}", info.toString(), e);
- }
- }
- logCreateAnalysisJob(job);
- analysisJobIdToTaskMap.remove(job.jobId);
- }
- }
- return null;
- };
-
private final String progressDisplayTemplate = "%d Finished | %d Failed
| %d In Progress | %d Total";
- protected final Function<TaskStatusWrapper, Void> systemJobStatusUpdater =
w -> {
- AnalysisInfo info = w.info;
- info.state = w.taskState;
- info.message = w.message;
- AnalysisInfo job = systemJobInfoMap.get(info.jobId);
- if (job == null) {
- return null;
- }
- synchronized (job) {
- // Set the job state to RUNNING when its first task becomes
RUNNING.
- if (info.state.equals(AnalysisState.RUNNING) &&
job.state.equals(AnalysisState.PENDING)) {
- job.state = AnalysisState.RUNNING;
- job.markStartTime(System.currentTimeMillis());
- }
- }
- int failedCount = 0;
- StringJoiner reason = new StringJoiner(", ");
- Map<Long, BaseAnalysisTask> taskMap =
analysisJobIdToTaskMap.get(info.jobId);
- for (BaseAnalysisTask task : taskMap.values()) {
- if (task.info.state.equals(AnalysisState.RUNNING) ||
task.info.state.equals(AnalysisState.PENDING)) {
- return null;
- }
- if (task.info.state.equals(AnalysisState.FAILED)) {
- failedCount++;
- reason.add(task.info.message);
- }
- }
- try {
- updateTableStats(job);
- } catch (Throwable e) {
- LOG.warn("Failed to update Table statistics in job: {}",
info.toString(), e);
- } finally {
- job.lastExecTimeInMs = System.currentTimeMillis();
- job.message = reason.toString();
- job.progress = String.format(progressDisplayTemplate,
- taskMap.size() - failedCount, failedCount, 0,
taskMap.size());
- if (failedCount > 0) {
- job.message = reason.toString();
- job.markFailed();
- } else {
- job.markFinished();
- }
- autoJobs.offer(job);
- systemJobInfoMap.remove(info.jobId);
- analysisJobIdToTaskMap.remove(info.jobId);
- }
- return null;
- };
-
- private final Function<TaskStatusWrapper, Void>[] updaters =
- new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
-
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
@@ -524,7 +406,7 @@ public class AnalysisManager implements Writable {
}
@VisibleForTesting
- public void recordAnalysisJob(AnalysisInfo jobInfo) throws DdlException {
+ public void recordAnalysisJob(AnalysisInfo jobInfo) {
if (jobInfo.scheduleType == ScheduleType.PERIOD &&
jobInfo.lastExecTimeInMs > 0) {
return;
}
@@ -554,13 +436,7 @@ public class AnalysisManager implements Writable {
if (isSync) {
continue;
}
- try {
- if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
- replayCreateAnalysisTask(analysisInfo);
- }
- } catch (Exception e) {
- throw new DdlException("Failed to create analysis task", e);
- }
+ replayCreateAnalysisTask(analysisInfo);
}
}
@@ -594,16 +470,63 @@ public class AnalysisManager implements Writable {
// For sync job, don't need to persist, return here and execute it
immediately.
return;
}
- try {
- replayCreateAnalysisTask(analysisInfo);
- } catch (Exception e) {
- throw new DdlException("Failed to create analysis task", e);
- }
+ replayCreateAnalysisTask(analysisInfo);
}
public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState,
String message, long time) {
- TaskStatusWrapper taskStatusWrapper = new TaskStatusWrapper(info,
taskState, message, time);
- updaters[info.jobType.ordinal()].apply(taskStatusWrapper);
+ if (analysisJobIdToTaskMap.get(info.jobId) == null) {
+ return;
+ }
+ info.state = taskState;
+ info.message = message;
+ // Update the task cost time when task finished or failed. And only
log the final state.
+ if (taskState.equals(AnalysisState.FINISHED) ||
taskState.equals(AnalysisState.FAILED)) {
+ info.timeCostInMs = time - info.lastExecTimeInMs;
+ info.lastExecTimeInMs = time;
+ logCreateAnalysisTask(info);
+ }
+ info.lastExecTimeInMs = time;
+ AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
+ // Job may get deleted during execution.
+ if (job == null) {
+ return;
+ }
+ // Synchronize the job state change in job level.
+ synchronized (job) {
+ job.lastExecTimeInMs = time;
+ // Set the job state to RUNNING when its first task becomes
RUNNING.
+ if (info.state.equals(AnalysisState.RUNNING) &&
job.state.equals(AnalysisState.PENDING)) {
+ job.state = AnalysisState.RUNNING;
+ job.markStartTime(System.currentTimeMillis());
+ replayCreateAnalysisJob(job);
+ }
+ boolean allFinished = true;
+ boolean hasFailure = false;
+ for (BaseAnalysisTask task :
analysisJobIdToTaskMap.get(info.jobId).values()) {
+ AnalysisInfo taskInfo = task.info;
+ if (taskInfo.state.equals(AnalysisState.RUNNING) ||
taskInfo.state.equals(AnalysisState.PENDING)) {
+ allFinished = false;
+ break;
+ }
+ if (taskInfo.state.equals(AnalysisState.FAILED)) {
+ hasFailure = true;
+ }
+ }
+ if (allFinished) {
+ if (hasFailure) {
+ job.markFailed();
+ } else {
+ job.markFinished();
+ try {
+ updateTableStats(job);
+ } catch (Throwable e) {
+ LOG.warn("Failed to update Table statistics in job:
{}", info.toString(), e);
+ }
+ }
+ logCreateAnalysisJob(job);
+ analysisJobIdToTaskMap.remove(job.jobId);
+ }
+ }
}
@VisibleForTesting
@@ -629,12 +552,6 @@ public class AnalysisManager implements Writable {
}
public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
- if (stmt.isAuto()) {
- // It's ok to sync on this field, it would only be assigned when
instance init or do checkpoint
- synchronized (autoJobs) {
- return findShowAnalyzeResult(autoJobs, stmt);
- }
- }
return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt);
}
@@ -650,6 +567,8 @@ public class AnalysisManager implements Writable {
.filter(a -> stmt.getJobId() == 0 || a.jobId ==
stmt.getJobId())
.filter(a -> state == null ||
a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.tblId == tblId)
+ .filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM)
+ || !stmt.isAuto() &&
a.jobType.equals(JobType.MANUAL))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
}
@@ -884,24 +803,6 @@ public class AnalysisManager implements Writable {
}
}
- public List<AnalysisInfo> findPeriodicJobs() {
- synchronized (analysisJobInfoMap) {
- Predicate<AnalysisInfo> p = a -> {
- if (a.state.equals(AnalysisState.RUNNING)) {
- return false;
- }
- if (a.cronExpression == null) {
- return a.scheduleType.equals(ScheduleType.PERIOD)
- && System.currentTimeMillis() - a.lastExecTimeInMs
> a.periodTimeInMs;
- }
- return a.cronExpression.getTimeAfter(new
Date(a.lastExecTimeInMs)).before(new Date());
- };
- return analysisJobInfoMap.values().stream()
- .filter(p)
- .collect(Collectors.toList());
- }
- }
-
public List<AnalysisInfo> findTasks(long jobId) {
synchronized (analysisTaskInfoMap) {
return analysisTaskInfoMap.values().stream().filter(i -> i.jobId
== jobId).collect(Collectors.toList());
@@ -979,10 +880,11 @@ public class AnalysisManager implements Writable {
}
}
+ // To be deprecated, keep it for meta compatibility now, will remove later.
private static void readAutoJobs(DataInput in, AnalysisManager
analysisManager) throws IOException {
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
- Collection<AnalysisInfo> autoJobs =
GsonUtils.GSON.fromJson(Text.readString(in), type);
- analysisManager.autoJobs = analysisManager.createSimpleQueue(autoJobs,
analysisManager);
+ GsonUtils.GSON.fromJson(Text.readString(in), type);
+ analysisManager.autoJobs = analysisManager.createSimpleQueue(null,
null);
}
@Override
@@ -1054,7 +956,7 @@ public class AnalysisManager implements Writable {
}
public void registerSysJob(AnalysisInfo jobInfo, Map<Long,
BaseAnalysisTask> taskInfos) {
- systemJobInfoMap.put(jobInfo.jobId, jobInfo);
+ recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}
@@ -1126,10 +1028,6 @@ public class AnalysisManager implements Writable {
idToAnalysisJob.remove(id);
}
- public boolean hasUnFinished() {
- return !analysisJobIdToTaskMap.isEmpty();
- }
-
/**
* Only OlapTable and Hive HMSExternalTable can sample for now.
* @param table
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index fc6e7fb3e2c..5bbd0eac3ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -39,7 +39,6 @@ import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,12 +70,12 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
@SuppressWarnings({"rawtypes", "unchecked"})
private void analyzeAll() {
- Set<CatalogIf> catalogs =
Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog();
+ List<CatalogIf> catalogs = getCatalogsInOrder();
for (CatalogIf ctl : catalogs) {
if (!ctl.enableAutoAnalyze()) {
continue;
}
- Collection<DatabaseIf> dbs = ctl.getAllDbs();
+ List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
for (DatabaseIf<TableIf> databaseIf : dbs) {
if
(StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) {
continue;
@@ -91,6 +90,21 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
}
}
+ public List<CatalogIf> getCatalogsInOrder() {
+ return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
+ .sorted((c1, c2) -> (int) (c1.getId() -
c2.getId())).collect(Collectors.toList());
+ }
+
+ public List<DatabaseIf> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog)
{
+ return catalog.getAllDbs().stream()
+ .sorted((d1, d2) -> (int) (d1.getId() -
d2.getId())).collect(Collectors.toList());
+ }
+
+ public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
+ return db.getTables().stream()
+ .sorted((t1, t2) -> (int) (t1.getId() -
t2.getId())).collect(Collectors.toList());
+ }
+
public void analyzeDb(DatabaseIf<TableIf> databaseIf) throws DdlException {
List<AnalysisInfo> analysisInfos = constructAnalysisInfo(databaseIf);
for (AnalysisInfo analysisInfo : analysisInfos) {
@@ -111,7 +125,7 @@ public class StatisticsAutoCollector extends
StatisticsCollector {
protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends
TableIf> db) {
List<AnalysisInfo> analysisInfos = new ArrayList<>();
- for (TableIf table : db.getTables()) {
+ for (TableIf table : getTablesInOrder(db)) {
try {
if (skip(table)) {
continue;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 9d4c311523b..0985b9b2b95 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -52,10 +52,6 @@ public abstract class StatisticsCollector extends
MasterDaemon {
if (Env.isCheckpointThread()) {
return;
}
- if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
- LOG.info("Analyze tasks those submitted in last time is not
finished, skip");
- return;
- }
collect();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
index 7c57a67f889..7a6247f0a1d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java
@@ -57,12 +57,13 @@ public class AnalysisManagerTest {
new MockUp<AnalysisManager>() {
@Mock
- public void logCreateAnalysisTask(AnalysisInfo job) {
- }
+ public void logCreateAnalysisTask(AnalysisInfo job) {}
@Mock
- public void logCreateAnalysisJob(AnalysisInfo job) {
- }
+ public void logCreateAnalysisJob(AnalysisInfo job) {}
+
+ @Mock
+ public void updateTableStats(AnalysisInfo jobInfo) {}
};
@@ -269,111 +270,6 @@ public class AnalysisManagerTest {
};
}
- @Test
- public void testSystemJobStatusUpdater() {
- new MockUp<BaseAnalysisTask>() {
-
- @Mock
- protected void init(AnalysisInfo info) {
-
- }
- };
-
- new MockUp<AnalysisManager>() {
- @Mock
- public void updateTableStats(AnalysisInfo jobInfo) {}
-
- @Mock
- protected void logAutoJob(AnalysisInfo autoJob) {
-
- }
- };
-
- AnalysisManager analysisManager = new AnalysisManager();
- AnalysisInfo job = new AnalysisInfoBuilder()
- .setJobId(0)
- .setColName("col1, col2").build();
- analysisManager.systemJobInfoMap.put(job.jobId, job);
- AnalysisInfo task1 = new AnalysisInfoBuilder()
- .setJobId(0)
- .setTaskId(1)
- .setState(AnalysisState.RUNNING)
- .setColName("col1").build();
- AnalysisInfo task2 = new AnalysisInfoBuilder()
- .setJobId(0)
- .setTaskId(1)
- .setState(AnalysisState.FINISHED)
- .setColName("col2").build();
- OlapAnalysisTask ot1 = new OlapAnalysisTask(task1);
- OlapAnalysisTask ot2 = new OlapAnalysisTask(task2);
- Map<Long, BaseAnalysisTask> taskMap = new HashMap<>();
- taskMap.put(ot1.info.taskId, ot1);
- taskMap.put(ot2.info.taskId, ot2);
- analysisManager.analysisJobIdToTaskMap.put(job.jobId, taskMap);
-
- // test invalid job
- AnalysisInfo invalidJob = new
AnalysisInfoBuilder().setJobId(-1).build();
- analysisManager.systemJobStatusUpdater.apply(new
TaskStatusWrapper(invalidJob,
- AnalysisState.FAILED, "", 0));
-
- // test finished
- analysisManager.systemJobStatusUpdater.apply(new
TaskStatusWrapper(task1, AnalysisState.FAILED, "", 0));
- analysisManager.systemJobStatusUpdater.apply(new
TaskStatusWrapper(task1, AnalysisState.FINISHED, "", 0));
- Assertions.assertEquals(1, analysisManager.autoJobs.size());
- Assertions.assertTrue(analysisManager.systemJobInfoMap.isEmpty());
- }
-
- @Test
- public void testSystemJobStartTime() {
- new MockUp<BaseAnalysisTask>() {
-
- @Mock
- protected void init(AnalysisInfo info) {
-
- }
- };
-
- new MockUp<AnalysisManager>() {
- @Mock
- public void updateTableStats(AnalysisInfo jobInfo) {
- }
-
- @Mock
- protected void logAutoJob(AnalysisInfo autoJob) {
-
- }
- };
-
- AnalysisManager analysisManager = new AnalysisManager();
- AnalysisInfo job = new AnalysisInfoBuilder()
- .setJobId(0)
- .setColName("col1, col2").build();
- analysisManager.systemJobInfoMap.put(job.jobId, job);
- AnalysisInfo task1 = new AnalysisInfoBuilder()
- .setJobId(0)
- .setTaskId(1)
- .setState(AnalysisState.PENDING)
- .setColName("col1").build();
- AnalysisInfo task2 = new AnalysisInfoBuilder()
- .setJobId(0)
- .setTaskId(1)
- .setState(AnalysisState.PENDING)
- .setColName("col2").build();
- OlapAnalysisTask ot1 = new OlapAnalysisTask(task1);
- OlapAnalysisTask ot2 = new OlapAnalysisTask(task2);
- Map<Long, BaseAnalysisTask> taskMap = new HashMap<>();
- taskMap.put(ot1.info.taskId, ot1);
- taskMap.put(ot2.info.taskId, ot2);
- analysisManager.analysisJobIdToTaskMap.put(job.jobId, taskMap);
-
- job.state = AnalysisState.PENDING;
- long l = System.currentTimeMillis();
- analysisManager.systemJobInfoMap.put(job.jobId, job);
- analysisManager.systemJobStatusUpdater.apply(new
TaskStatusWrapper(task1,
- AnalysisState.RUNNING, "", 0));
- Assertions.assertTrue(job.startTime >= l);
- }
-
@Test
public void testReAnalyze() {
new MockUp<OlapTable>() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]