This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b9ad347 [Feature] Support recording custom number of backup and
restore task information (#5947)
b9ad347 is described below
commit b9ad34736df16d95cf98347393a79ac332627031
Author: Hao Tan <[email protected]>
AuthorDate: Tue Jun 22 09:19:54 2021 +0800
[Feature] Support recording custom number of backup and restore task
information (#5947)
* Record all backup jobs and support where clause
---
docs/en/administrator-guide/config/fe_config.md | 5 +
docs/zh-CN/administrator-guide/config/fe_config.md | 6 ++
fe/fe-core/src/main/cup/sql_parser.cup | 4 +-
.../org/apache/doris/analysis/ShowBackupStmt.java | 89 +++++++++++++++-
.../org/apache/doris/analysis/ShowRestoreStmt.java | 82 ++++++++++++++-
.../org/apache/doris/backup/BackupHandler.java | 117 ++++++++++++++++-----
.../main/java/org/apache/doris/common/Config.java | 6 ++
.../java/org/apache/doris/qe/ShowExecutor.java | 30 +++---
8 files changed, 287 insertions(+), 52 deletions(-)
diff --git a/docs/en/administrator-guide/config/fe_config.md
b/docs/en/administrator-guide/config/fe_config.md
index 2cf55f6..db4cd53 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -496,6 +496,11 @@ This will limit the max recursion depth of hash
distribution pruner.
So that distribution pruner will no work and just return all buckets.
Increase the depth can support distribution pruning for more elements,
but may cost more CPU.
+### max_backup_restore_job_num_per_db
+
+Default: 10
+
+This configuration is mainly used to control the number of backup/restore
tasks recorded in each database.
### using_old_load_usage_pattern
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md
b/docs/zh-CN/administrator-guide/config/fe_config.md
index 289d2b3..612704f 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -545,6 +545,12 @@ SmallFileMgr 中存储的最大文件数
最大 Routine Load 作业数,包括 NEED_SCHEDULED, RUNNING, PAUSE
+### max_backup_restore_job_num_per_db
+
+默认值:10
+
+此配置用于控制每个 DB 能够记录的 backup/restore 任务的数量
+
### max_running_txn_num_per_db
默认值:100
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index ffdf533..7831841 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2509,9 +2509,9 @@ show_param ::=
{:
RESULT = new ShowUserPropertyStmt(user, parser.wild);
:}
- | KW_BACKUP opt_db:db
+ | KW_BACKUP opt_db:db opt_wild_where
{:
- RESULT = new ShowBackupStmt(db);
+ RESULT = new ShowBackupStmt(db, parser.where);
:}
| KW_RESTORE opt_db:db opt_wild_where
{:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java
index 6137697..053d5c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBackupStmt.java
@@ -21,8 +21,11 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -31,6 +34,8 @@ import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
+import java.util.function.Predicate;
+
public class ShowBackupStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("JobId").add("SnapshotName").add("DbName").add("State").add("BackupObjs").add("CreateTime")
@@ -39,9 +44,13 @@ public class ShowBackupStmt extends ShowStmt {
.build();
private String dbName;
+ private final Expr where;
+ private boolean isAccurateMatch;
+ private String labelValue;
- public ShowBackupStmt(String dbName) {
+ public ShowBackupStmt(String dbName, Expr where) {
this.dbName = dbName;
+ this.where = where;
}
public String getDbName() {
@@ -65,6 +74,56 @@ public class ShowBackupStmt extends ShowStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED,
ConnectContext.get().getQualifiedUser(), dbName);
}
+
+ if (where == null) {
+ return;
+ }
+ boolean valid = analyzeWhereClause();
+ if (!valid) {
+ throw new AnalysisException("Where clause should like: LABEL =
\"your_label_name\", "
+ + " or LABEL LIKE \"matcher\"");
+ }
+ }
+
+ private boolean analyzeWhereClause() {
+ if (!(where instanceof LikePredicate) && !(where instanceof
BinaryPredicate)) {
+ return false;
+ }
+
+ if (where instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) where;
+ if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) {
+ return false;
+ }
+ isAccurateMatch = true;
+ }
+
+ if (where instanceof LikePredicate) {
+ LikePredicate likePredicate = (LikePredicate) where;
+ if (LikePredicate.Operator.LIKE != likePredicate.getOp()) {
+ return false;
+ }
+ }
+
+ // left child
+ if (!(where.getChild(0) instanceof SlotRef)) {
+ return false;
+ }
+ String leftKey = ((SlotRef) where.getChild(0)).getColumnName();
+ if (!"label".equalsIgnoreCase(leftKey)) {
+ return false;
+ }
+
+ // right child
+ if (!(where.getChild(1) instanceof StringLiteral)) {
+ return false;
+ }
+ labelValue = ((StringLiteral) where.getChild(1)).getStringValue();
+ if (Strings.isNullOrEmpty(labelValue)) {
+ return false;
+ }
+
+ return true;
}
@Override
@@ -84,6 +143,10 @@ public class ShowBackupStmt extends ShowStmt {
builder.append(" FROM `").append(dbName).append("` ");
}
+ if (where != null) {
+ builder.append(where.toSql());
+ }
+
return builder.toString();
}
@@ -96,4 +159,28 @@ public class ShowBackupStmt extends ShowStmt {
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
+
+ public boolean isAccurateMatch() {
+ return isAccurateMatch;
+ }
+
+ public String getLabelValue() {
+ return labelValue;
+ }
+
+ public Expr getWhere() {
+ return where;
+ }
+
+ public Predicate<String> getLabelPredicate() throws AnalysisException {
+ if (null == where) {
+ return label -> true;
+ }
+ if (isAccurateMatch) {
+ return CaseSensibility.LABEL.getCaseSensibility() ? label ->
label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue);
+ } else {
+ PatternMatcher patternMatcher =
PatternMatcher.createMysqlPattern(labelValue,
CaseSensibility.LABEL.getCaseSensibility());
+ return patternMatcher::match;
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java
index 9426898..b47a0a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRestoreStmt.java
@@ -21,8 +21,11 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -31,6 +34,8 @@ import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
+import java.util.function.Predicate;
+
public class ShowRestoreStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("JobId").add("Label").add("Timestamp").add("DbName").add("State")
@@ -42,7 +47,8 @@ public class ShowRestoreStmt extends ShowStmt {
private String dbName;
private Expr where;
- private String label;
+ private String labelValue;
+ private boolean isAccurateMatch;
public ShowRestoreStmt(String dbName, Expr where) {
this.dbName = dbName;
@@ -53,8 +59,8 @@ public class ShowRestoreStmt extends ShowStmt {
return dbName;
}
- public String getLabel() {
- return label;
+ public String getLabelValue() {
+ return labelValue;
}
@Override
@@ -74,6 +80,56 @@ public class ShowRestoreStmt extends ShowStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED,
ConnectContext.get().getQualifiedUser(), dbName);
}
+
+ if (where == null) {
+ return;
+ }
+ boolean valid = analyzeWhereClause();
+ if (!valid) {
+ throw new AnalysisException("Where clause should like: LABEL =
\"your_label_name\", "
+ + " or LABEL LIKE \"matcher\"");
+ }
+ }
+
+ private boolean analyzeWhereClause() {
+ if (!(where instanceof LikePredicate) && !(where instanceof
BinaryPredicate)) {
+ return false;
+ }
+
+ if (where instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) where;
+ if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) {
+ return false;
+ }
+ isAccurateMatch = true;
+ }
+
+ if (where instanceof LikePredicate) {
+ LikePredicate likePredicate = (LikePredicate) where;
+ if (LikePredicate.Operator.LIKE != likePredicate.getOp()) {
+ return false;
+ }
+ }
+
+ // left child
+ if (!(where.getChild(0) instanceof SlotRef)) {
+ return false;
+ }
+ String leftKey = ((SlotRef) where.getChild(0)).getColumnName();
+ if (!"label".equalsIgnoreCase(leftKey)) {
+ return false;
+ }
+
+ // right child
+ if (!(where.getChild(1) instanceof StringLiteral)) {
+ return false;
+ }
+ labelValue = ((StringLiteral) where.getChild(1)).getStringValue();
+ if (Strings.isNullOrEmpty(labelValue)) {
+ return false;
+ }
+
+ return true;
}
@Override
@@ -106,5 +162,25 @@ public class ShowRestoreStmt extends ShowStmt {
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
+
+ public boolean isAccurateMatch() {
+ return isAccurateMatch;
+ }
+
+ public Expr getWhere() {
+ return where;
+ }
+
+ public Predicate<String> getLabelPredicate() throws AnalysisException {
+ if (null == where) {
+ return label -> true;
+ }
+ if (isAccurateMatch) {
+ return CaseSensibility.LABEL.getCaseSensibility() ? label ->
label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue);
+ } else {
+ PatternMatcher patternMatcher =
PatternMatcher.createMysqlPattern(labelValue,
CaseSensibility.LABEL.getCaseSensibility());
+ return patternMatcher::match;
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index bf5e43c..5b18f2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -55,9 +55,9 @@ import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -67,11 +67,16 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
public class BackupHandler extends MasterDaemon implements Writable {
private static final Logger LOG =
LogManager.getLogger(BackupHandler.class);
@@ -82,13 +87,14 @@ public class BackupHandler extends MasterDaemon implements
Writable {
private RepositoryMgr repoMgr = new RepositoryMgr();
- // db id -> last running or finished backup/restore jobs
- // We only save the last backup/restore job of a database.
+ // this lock is used for updating dbIdToBackupOrRestoreJobs
+ private final ReentrantLock jobLock = new ReentrantLock();
+
+ // db id -> last 10(max_backup_restore_job_num_per_db) backup/restore jobs
// Newly submitted job will replace the current job, only if current job
is finished or cancelled.
// If the last job is finished, user can get the job info from repository.
If the last job is cancelled,
// user can get the error message before submitting the next one.
- // Use ConcurrentMap to get rid of locks.
- private Map<Long, AbstractJob> dbIdToBackupOrRestoreJob =
Maps.newConcurrentMap();
+ private final Map<Long, Deque<AbstractJob>> dbIdToBackupOrRestoreJobs =
new HashMap<>();
// this lock is used for handling one backup or restore request at a time.
private ReentrantLock seqlock = new ReentrantLock();
@@ -154,7 +160,19 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
public AbstractJob getJob(long dbId) {
- return dbIdToBackupOrRestoreJob.get(dbId);
+ return getCurrentJob(dbId);
+ }
+
+ public List<AbstractJob> getJobs(long dbId, Predicate<String> predicate) {
+ jobLock.lock();
+ try {
+ return dbIdToBackupOrRestoreJobs.getOrDefault(dbId, new
LinkedList<>())
+ .stream()
+ .filter(e -> predicate.test(e.getLabel()))
+ .collect(Collectors.toList());
+ } finally {
+ jobLock.unlock();
+ }
}
@Override
@@ -165,7 +183,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
}
- for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
+ for (AbstractJob job : getAllCurrentJobs()) {
job.setCatalog(catalog);
job.run();
}
@@ -197,8 +215,8 @@ public class BackupHandler extends MasterDaemon implements
Writable {
if (repo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repository does not exist");
}
-
- for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
+
+ for (AbstractJob job : getAllCurrentJobs()) {
if (!job.isDone() && job.getRepoId() == repo.getId()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Backup or restore job is
running on this repository."
@@ -239,7 +257,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
tryLock();
try {
// Check if there is backup or restore job running on this database
- AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(db.getId());
+ AbstractJob currentJob = getCurrentJob(db.getId());
if (currentJob != null && !currentJob.isDone()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can only run one backup or
restore job of a database at same time");
@@ -364,7 +382,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
catalog.getEditLog().logBackupJob(backupJob);
// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the
state of job may be changed.
- dbIdToBackupOrRestoreJob.put(db.getId(), backupJob);
+ addBackupOrRestoreJob(db.getId(), backupJob);
LOG.info("finished to submit backup job: {}", backupJob);
}
@@ -392,11 +410,51 @@ public class BackupHandler extends MasterDaemon
implements Writable {
catalog.getEditLog().logRestoreJob(restoreJob);
// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the
state of job may be changed.
- dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob);
+ addBackupOrRestoreJob(db.getId(), restoreJob);
LOG.info("finished to submit restore job: {}", restoreJob);
}
+ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
+ jobLock.lock();
+ try {
+ Deque<AbstractJob> jobs =
dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
+ while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
+ jobs.removeFirst();
+ }
+ AbstractJob lastJob = jobs.peekLast();
+
+ // Remove duplicate jobs and keep only the latest status
+ // Otherwise, the tasks that have been successfully executed will
be repeated when replaying edit log.
+ if (lastJob != null && (lastJob.isPending() || lastJob.getJobId()
== job.getJobId())) {
+ jobs.removeLast();
+ }
+ jobs.addLast(job);
+ } finally {
+ jobLock.unlock();
+ }
+ }
+
+ private List<AbstractJob> getAllCurrentJobs() {
+ jobLock.lock();
+ try {
+ return
dbIdToBackupOrRestoreJobs.values().stream().filter(CollectionUtils::isNotEmpty)
+ .map(Deque::getLast).collect(Collectors.toList());
+ } finally {
+ jobLock.unlock();
+ }
+ }
+
+ private AbstractJob getCurrentJob(long dbId) {
+ jobLock.lock();
+ try {
+ Deque<AbstractJob> jobs =
dbIdToBackupOrRestoreJobs.getOrDefault(dbId, Lists.newLinkedList());
+ return jobs.isEmpty() ? null : jobs.getLast();
+ } finally {
+ jobLock.unlock();
+ }
+ }
+
private void checkAndFilterRestoreObjsExistInSnapshot(BackupJobInfo
jobInfo,
AbstractBackupTableRefClause backupTableRefClause)
throws DdlException {
@@ -490,8 +548,8 @@ public class BackupHandler extends MasterDaemon implements
Writable {
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
-
- AbstractJob job = dbIdToBackupOrRestoreJob.get(db.getId());
+
+ AbstractJob job = getCurrentJob(db.getId());
if (job == null || (job instanceof BackupJob && stmt.isRestore())
|| (job instanceof RestoreJob && !stmt.isRestore())) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "No "
@@ -508,7 +566,8 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
public boolean handleFinishedSnapshotTask(SnapshotTask task,
TFinishTaskRequest request) {
- AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
+ AbstractJob job = getCurrentJob(task.getDbId());
+
if (job == null) {
LOG.warn("failed to find backup or restore job for task: {}",
task);
// return true to remove this task from AgentTaskQueue
@@ -533,7 +592,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
public boolean handleFinishedSnapshotUploadTask(UploadTask task,
TFinishTaskRequest request) {
- AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
+ AbstractJob job = getCurrentJob(task.getDbId());
if (job == null || (job instanceof RestoreJob)) {
LOG.info("invalid upload task: {}, no backup job is found. db id:
{}", task, task.getDbId());
return false;
@@ -548,8 +607,8 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
public boolean handleDownloadSnapshotTask(DownloadTask task,
TFinishTaskRequest request) {
- AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
- if (job == null || !(job instanceof RestoreJob)) {
+ AbstractJob job = getCurrentJob(task.getDbId());
+ if (!(job instanceof RestoreJob)) {
LOG.warn("failed to find restore job for task: {}", task);
// return true to remove this task from AgentTaskQueue
return true;
@@ -559,8 +618,8 @@ public class BackupHandler extends MasterDaemon implements
Writable {
}
public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest
request) {
- AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
- if (job == null || !(job instanceof RestoreJob)) {
+ AbstractJob job = getCurrentJob(task.getDbId());
+ if (!(job instanceof RestoreJob)) {
LOG.warn("failed to find restore job for task: {}", task);
// return true to remove this task from AgentTaskQueue
return true;
@@ -571,16 +630,16 @@ public class BackupHandler extends MasterDaemon
implements Writable {
public void replayAddJob(AbstractJob job) {
if (job.isCancelled()) {
- AbstractJob existingJob =
dbIdToBackupOrRestoreJob.get(job.getDbId());
+ AbstractJob existingJob = getCurrentJob(job.getDbId());
if (existingJob == null || existingJob.isDone()) {
LOG.error("invalid existing job: {}. current replay job is:
{}",
- existingJob, job);
+ existingJob, job);
return;
}
existingJob.setCatalog(catalog);
existingJob.replayCancel();
} else if (!job.isPending()) {
- AbstractJob existingJob =
dbIdToBackupOrRestoreJob.get(job.getDbId());
+ AbstractJob existingJob = getCurrentJob(job.getDbId());
if (existingJob == null || existingJob.isDone()) {
LOG.error("invalid existing job: {}. current replay job is:
{}",
existingJob, job);
@@ -591,11 +650,12 @@ public class BackupHandler extends MasterDaemon
implements Writable {
// for example: In restore job, PENDING will transfer to
SNAPSHOTING, not DOWNLOAD.
job.replayRun();
}
- dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
+
+ addBackupOrRestoreJob(job.getDbId(), job);
}
public boolean report(TTaskType type, long jobId, long taskId, int
finishedNum, int totalNum) {
- for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
+ for (AbstractJob job : getAllCurrentJobs()) {
if (job.getType() == JobType.BACKUP) {
if (!job.isDone() && job.getJobId() == jobId && type ==
TTaskType.UPLOAD) {
job.taskProgress.put(taskId, Pair.create(finishedNum,
totalNum));
@@ -621,8 +681,9 @@ public class BackupHandler extends MasterDaemon implements
Writable {
public void write(DataOutput out) throws IOException {
repoMgr.write(out);
- out.writeInt(dbIdToBackupOrRestoreJob.size());
- for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
+ List<AbstractJob> jobs =
dbIdToBackupOrRestoreJobs.values().stream().flatMap(Deque::stream).collect(Collectors.toList());
+ out.writeInt(jobs.size());
+ for (AbstractJob job : jobs) {
job.write(out);
}
}
@@ -633,7 +694,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
int size = in.readInt();
for (int i = 0; i < size; i++) {
AbstractJob job = AbstractJob.read(in);
- dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
+ addBackupOrRestoreJob(job.getDbId(), job);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 5a6ab5f..fc511db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1402,4 +1402,10 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_dynamic_partition_num = 500;
+
+ /*
+ * Control the max num of backup/restore job per db
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int max_backup_restore_job_num_per_db = 10;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 1e9a006..42e035c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -1614,16 +1614,13 @@ public class ShowExecutor {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR,
showStmt.getDbName());
}
- AbstractJob jobI =
Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId());
- if (!(jobI instanceof BackupJob)) {
- resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET);
- return;
- }
+ List<AbstractJob> jobs =
Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(),
showStmt.getLabelPredicate());
+
+ List<BackupJob> backupJobs = jobs.stream().filter(job -> job
instanceof BackupJob)
+ .map(job -> (BackupJob) job).collect(Collectors.toList());
+
+ List<List<String>> infos =
backupJobs.stream().map(BackupJob::getInfo).collect(Collectors.toList());
- BackupJob backupJob = (BackupJob) jobI;
- List<String> info = backupJob.getInfo();
- List<List<String>> infos = Lists.newArrayList();
- infos.add(info);
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}
@@ -1634,16 +1631,13 @@ public class ShowExecutor {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR,
showStmt.getDbName());
}
- AbstractJob jobI =
Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId());
- if (!(jobI instanceof RestoreJob)) {
- resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET);
- return;
- }
+ List<AbstractJob> jobs =
Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(),
showStmt.getLabelPredicate());
+
+ List<RestoreJob> restoreJobs = jobs.stream().filter(job -> job
instanceof RestoreJob)
+ .map(job -> (RestoreJob) job).collect(Collectors.toList());
+
+ List<List<String>> infos =
restoreJobs.stream().map(RestoreJob::getInfo).collect(Collectors.toList());
- RestoreJob restoreJob = (RestoreJob) jobI;
- List<String> info = restoreJob.getInfo();
- List<List<String>> infos = Lists.newArrayList();
- infos.add(info);
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]