This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new f5d29933c94 [improve](backup) Add the commit seq at the backup job level. #44049 (#44111) f5d29933c94 is described below commit f5d29933c947aad1d34d9200dfc32a4965a744c1 Author: walter <maoch...@selectdb.com> AuthorDate: Tue Nov 19 12:14:54 2024 +0800 [improve](backup) Add the commit seq at the backup job level. #44049 (#44111) cherry pick from #44049 --- .../org/apache/doris/backup/BackupHandler.java | 43 ++++++++++++++++------ .../java/org/apache/doris/backup/BackupJob.java | 34 +++++++++++++---- .../java/org/apache/doris/backup/Snapshot.java | 11 +++++- .../java/org/apache/doris/catalog/Database.java | 4 ++ .../java/org/apache/doris/persist/BarrierLog.java | 5 +++ .../apache/doris/service/FrontendServiceImpl.java | 6 ++- .../org/apache/doris/backup/BackupJobTest.java | 4 +- gensrc/thrift/FrontendService.thrift | 1 + 8 files changed, 85 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 3015e366c3e..61d1e9e5f13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -49,6 +49,7 @@ import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.persist.BarrierLog; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; import org.apache.doris.task.SnapshotTask; @@ -308,20 +309,32 @@ public class BackupHandler extends MasterDaemon implements Writable { + " is read only"); } - // Determine the tables to be backed up + long commitSeq = 0; Set<String> tableNames = Sets.newHashSet(); AbstractBackupTableRefClause abstractBackupTableRefClause = stmt.getAbstractBackupTableRefClause(); - if (abstractBackupTableRefClause == null) { - tableNames = db.getTableNamesWithLock(); - } else if (abstractBackupTableRefClause.isExclude()) { - tableNames = db.getTableNamesWithLock(); - for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) { - if (!tableNames.remove(tableRef.getName().getTbl())) { - LOG.info("exclude table " + tableRef.getName().getTbl() - + " of backup stmt is not exists in db " + db.getFullName()); + + // Obtain the snapshot commit seq, any creating table binlog will be visible. + db.readLock(); + try { + BarrierLog log = new BarrierLog(db.getId(), db.getFullName()); + commitSeq = env.getEditLog().logBarrier(log); + + // Determine the tables to be backed up + if (abstractBackupTableRefClause == null) { + tableNames = db.getTableNames(); + } else if (abstractBackupTableRefClause.isExclude()) { + tableNames = db.getTableNames(); + for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) { + if (!tableNames.remove(tableRef.getName().getTbl())) { + LOG.info("exclude table " + tableRef.getName().getTbl() + + " of backup stmt is not exists in db " + db.getFullName()); + } } } + } finally { + db.readUnlock(); } + List<TableRef> tblRefs = Lists.newArrayList(); if (abstractBackupTableRefClause != null && !abstractBackupTableRefClause.isExclude()) { tblRefs = abstractBackupTableRefClause.getTableRefList(); @@ -339,6 +352,14 @@ public class BackupHandler extends MasterDaemon implements Writable { for (TableRef tblRef : tblRefs) { String tblName = tblRef.getName().getTbl(); Table tbl = db.getTableOrDdlException(tblName); + + // filter the table types which are not supported by local backup. + if (repository == null && tbl.getType() != TableType.OLAP + && tbl.getType() != TableType.VIEW && tbl.getType() != TableType.MATERIALIZED_VIEW) { + tblRefsNotSupport.add(tblRef); + continue; + } + if (tbl.getType() == TableType.VIEW || tbl.getType() == TableType.ODBC) { continue; } @@ -385,7 +406,7 @@ public class BackupHandler extends MasterDaemon implements Writable { tblRefs.removeAll(tblRefsNotSupport); // Check if label already be used - long repoId = -1; + long repoId = Repository.KEEP_ON_LOCAL_REPO_ID; if (repository != null) { List<String> existSnapshotNames = Lists.newArrayList(); Status st = repository.listSnapshots(existSnapshotNames); @@ -407,7 +428,7 @@ public class BackupHandler extends MasterDaemon implements Writable { // Create a backup job BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(), ClusterNamespace.getNameFromFullName(db.getFullName()), - tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId); + tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId, commitSeq); // write log env.getEditLog().logBackupJob(backupJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 8a5043ebe7f..59365a24931 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -36,6 +36,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.View; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.persist.BarrierLog; @@ -85,6 +86,7 @@ import java.util.zip.GZIPOutputStream; public class BackupJob extends AbstractJob { private static final Logger LOG = LogManager.getLogger(BackupJob.class); private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:"; + private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq"; public enum BackupJobState { PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING @@ -123,6 +125,8 @@ public class BackupJob extends AbstractJob { // backup properties && table commit seq with table id private Map<String, String> properties = Maps.newHashMap(); + private long commitSeq = 0; + public BackupJob() { super(JobType.BACKUP); } @@ -133,11 +137,13 @@ public class BackupJob extends AbstractJob { } public BackupJob(String label, long dbId, String dbName, List<TableRef> tableRefs, long timeoutMs, - BackupContent content, Env env, long repoId) { + BackupContent content, Env env, long repoId, long commitSeq) { super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId); this.tableRefs = tableRefs; this.state = BackupJobState.PENDING; + this.commitSeq = commitSeq; properties.put(BackupStmt.PROP_CONTENT, content.name()); + properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq)); } public BackupJobState getState() { @@ -237,7 +243,7 @@ public class BackupJob extends AbstractJob { if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING && !tryNewTabletSnapshotTask(task)) { status = new Status(ErrCode.NOT_FOUND, - "make snapshot failed, failed to ge tablet, table will be droped or truncated"); + "make snapshot failed, failed to ge tablet, table will be dropped or truncated"); cancelInternal(); } @@ -379,6 +385,14 @@ public class BackupJob extends AbstractJob { LOG.debug("run backup job: {}", this); + if (state == BackupJobState.PENDING) { + String pausedLabel = DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", ""); + if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) { + LOG.info("pause pending backup job by debug point: {}", this); + return; + } + } + // run job base on current state switch (state) { case PENDING: @@ -526,7 +540,7 @@ public class BackupJob extends AbstractJob { private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable olapTable, TableRef backupTableRef, AgentBatchTask batchTask) { - // Add barrier editolog for barrier commit seq + // Add barrier editlog for barrier commit seq long dbId = db.getId(); String dbName = db.getFullName(); long tableId = olapTable.getId(); @@ -656,13 +670,11 @@ public class BackupJob extends AbstractJob { private void waitingAllSnapshotsFinished() { if (unfinishedTaskIds.isEmpty()) { - if (env.getEditLog().exceedMaxJournalSize(this)) { status = new Status(ErrCode.COMMON_ERROR, "backupJob is too large "); return; } - snapshotFinishedTime = System.currentTimeMillis(); state = BackupJobState.UPLOAD_SNAPSHOT; @@ -972,6 +984,10 @@ public class BackupJob extends AbstractJob { return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; } + public long getCommitSeq() { + return commitSeq; + } + // read meta and job info bytes from disk, and return the snapshot public synchronized Snapshot getSnapshot() { if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { @@ -981,7 +997,7 @@ public class BackupJob extends AbstractJob { // Avoid loading expired meta. long expiredAt = createTime + timeoutMs; if (System.currentTimeMillis() >= expiredAt) { - return new Snapshot(label, new byte[0], new byte[0], expiredAt); + return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq); } try { @@ -989,7 +1005,7 @@ public class BackupJob extends AbstractJob { File jobInfoFile = new File(localJobInfoFilePath); byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); - return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt); + return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq); } catch (IOException e) { LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", localMetaInfoFilePath, localJobInfoFilePath, e); @@ -1182,6 +1198,10 @@ public class BackupJob extends AbstractJob { String value = Text.readString(in); properties.put(key, value); } + + if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) { + commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ)); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java index c4c93548177..a9f734dbc99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java @@ -34,14 +34,18 @@ public class Snapshot { @SerializedName(value = "expired_at") private long expiredAt = 0; + @SerializedName(value = "commitSeq") + private long commitSeq = 0; + public Snapshot() { } - public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt) { + public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) { this.label = label; this.meta = meta; this.jobInfo = jobInfo; this.expiredAt = expiredAt; + this.commitSeq = commitSeq; } public byte[] getMeta() { @@ -60,6 +64,10 @@ public class Snapshot { return System.currentTimeMillis() > expiredAt; } + public long getCommitSeq() { + return commitSeq; + } + public String toJson() { return GsonUtils.GSON.toJson(this); } @@ -71,6 +79,7 @@ public class Snapshot { + ", meta=" + meta + ", jobInfo=" + jobInfo + ", expiredAt=" + expiredAt + + ", commitSeq=" + commitSeq + '}'; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index b2181093832..b81068d475c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -518,6 +518,10 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> } } + public Set<String> getTableNames() { + return new HashSet<>(this.nameToTable.keySet()); + } + /** * This is a thread-safe method when nameToTable is a concurrent hash map */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java index 2b4245b290c..4a9ce13e03b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java @@ -46,6 +46,11 @@ public class BarrierLog implements Writable { public BarrierLog() { } + public BarrierLog(long dbId, String dbName) { + this.dbId = dbId; + this.dbName = dbName; + } + public BarrierLog(long dbId, String dbName, long tableId, String tableName) { this.dbId = dbId; this.dbName = dbName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8c521cc73b0..e67534d302b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2820,9 +2820,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { byte[] meta = snapshot.getMeta(); byte[] jobInfo = snapshot.getJobInfo(); long expiredAt = snapshot.getExpiredAt(); + long commitSeq = snapshot.getCommitSeq(); - LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, expired at: {}", - label, meta.length, jobInfo.length, expiredAt); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, " + + "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq); if (request.isEnableCompress()) { meta = GZIPUtils.compress(meta); jobInfo = GZIPUtils.compress(jobInfo); @@ -2835,6 +2836,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setMeta(meta); result.setJobInfo(jobInfo); result.setExpiredAt(expiredAt); + result.setCommitSeq(commitSeq); } return result; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java index 4e0eecda1fa..dd9cb075293 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java @@ -212,7 +212,7 @@ public class BackupJobTest { new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME), null)); job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL, - env, repo.getId()); + env, repo.getId(), 0); } @Test @@ -348,7 +348,7 @@ public class BackupJobTest { new TableRef(new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, "unknown_tbl"), null)); job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL, - env, repo.getId()); + env, repo.getId(), 0); job.run(); Assert.assertEquals(Status.ErrCode.NOT_FOUND, job.getStatus().getErrCode()); Assert.assertEquals(BackupJobState.CANCELLED, job.getState()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a75275bd917..b58f8a42e89 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1204,6 +1204,7 @@ struct TGetSnapshotResult { 4: optional Types.TNetworkAddress master_address 5: optional bool compressed; 6: optional i64 expiredAt; // in millis + 7: optional i64 commit_seq; } struct TTableRef { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org