This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new f5d29933c94 [improve](backup) Add the commit seq at the backup job 
level. #44049 (#44111)
f5d29933c94 is described below

commit f5d29933c947aad1d34d9200dfc32a4965a744c1
Author: walter <maoch...@selectdb.com>
AuthorDate: Tue Nov 19 12:14:54 2024 +0800

    [improve](backup) Add the commit seq at the backup job level. #44049 
(#44111)
    
    cherry pick from #44049
---
 .../org/apache/doris/backup/BackupHandler.java     | 43 ++++++++++++++++------
 .../java/org/apache/doris/backup/BackupJob.java    | 34 +++++++++++++----
 .../java/org/apache/doris/backup/Snapshot.java     | 11 +++++-
 .../java/org/apache/doris/catalog/Database.java    |  4 ++
 .../java/org/apache/doris/persist/BarrierLog.java  |  5 +++
 .../apache/doris/service/FrontendServiceImpl.java  |  6 ++-
 .../org/apache/doris/backup/BackupJobTest.java     |  4 +-
 gensrc/thrift/FrontendService.thrift               |  1 +
 8 files changed, 85 insertions(+), 23 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 3015e366c3e..61d1e9e5f13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -49,6 +49,7 @@ import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.SnapshotTask;
@@ -308,20 +309,32 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
                     + " is read only");
         }
 
-        // Determine the tables to be backed up
+        long commitSeq = 0;
         Set<String> tableNames = Sets.newHashSet();
         AbstractBackupTableRefClause abstractBackupTableRefClause = 
stmt.getAbstractBackupTableRefClause();
-        if (abstractBackupTableRefClause == null) {
-            tableNames = db.getTableNamesWithLock();
-        } else if (abstractBackupTableRefClause.isExclude()) {
-            tableNames = db.getTableNamesWithLock();
-            for (TableRef tableRef : 
abstractBackupTableRefClause.getTableRefList()) {
-                if (!tableNames.remove(tableRef.getName().getTbl())) {
-                    LOG.info("exclude table " + tableRef.getName().getTbl()
-                            + " of backup stmt is not exists in db " + 
db.getFullName());
+
+        // Obtain the snapshot commit seq, any creating table binlog will be 
visible.
+        db.readLock();
+        try {
+            BarrierLog log = new BarrierLog(db.getId(), db.getFullName());
+            commitSeq = env.getEditLog().logBarrier(log);
+
+            // Determine the tables to be backed up
+            if (abstractBackupTableRefClause == null) {
+                tableNames = db.getTableNames();
+            } else if (abstractBackupTableRefClause.isExclude()) {
+                tableNames = db.getTableNames();
+                for (TableRef tableRef : 
abstractBackupTableRefClause.getTableRefList()) {
+                    if (!tableNames.remove(tableRef.getName().getTbl())) {
+                        LOG.info("exclude table " + tableRef.getName().getTbl()
+                                + " of backup stmt is not exists in db " + 
db.getFullName());
+                    }
                 }
             }
+        } finally {
+            db.readUnlock();
         }
+
         List<TableRef> tblRefs = Lists.newArrayList();
         if (abstractBackupTableRefClause != null && 
!abstractBackupTableRefClause.isExclude()) {
             tblRefs = abstractBackupTableRefClause.getTableRefList();
@@ -339,6 +352,14 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         for (TableRef tblRef : tblRefs) {
             String tblName = tblRef.getName().getTbl();
             Table tbl = db.getTableOrDdlException(tblName);
+
+            // filter the table types which are not supported by local backup.
+            if (repository == null && tbl.getType() != TableType.OLAP
+                    && tbl.getType() != TableType.VIEW && tbl.getType() != 
TableType.MATERIALIZED_VIEW) {
+                tblRefsNotSupport.add(tblRef);
+                continue;
+            }
+
             if (tbl.getType() == TableType.VIEW || tbl.getType() == 
TableType.ODBC) {
                 continue;
             }
@@ -385,7 +406,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         tblRefs.removeAll(tblRefsNotSupport);
 
         // Check if label already be used
-        long repoId = -1;
+        long repoId = Repository.KEEP_ON_LOCAL_REPO_ID;
         if (repository != null) {
             List<String> existSnapshotNames = Lists.newArrayList();
             Status st = repository.listSnapshots(existSnapshotNames);
@@ -407,7 +428,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         // Create a backup job
         BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
                 ClusterNamespace.getNameFromFullName(db.getFullName()),
-                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
+                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId, 
commitSeq);
         // write log
         env.getEditLog().logBackupJob(backupJob);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 8a5043ebe7f..59365a24931 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -36,6 +36,7 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.View;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.property.S3ClientBEProperties;
 import org.apache.doris.persist.BarrierLog;
@@ -85,6 +86,7 @@ import java.util.zip.GZIPOutputStream;
 public class BackupJob extends AbstractJob {
     private static final Logger LOG = LogManager.getLogger(BackupJob.class);
     private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
+    private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq";
 
     public enum BackupJobState {
         PENDING, // Job is newly created. Send snapshot tasks and save copied 
meta info, then transfer to SNAPSHOTING
@@ -123,6 +125,8 @@ public class BackupJob extends AbstractJob {
     // backup properties && table commit seq with table id
     private Map<String, String> properties = Maps.newHashMap();
 
+    private long commitSeq = 0;
+
     public BackupJob() {
         super(JobType.BACKUP);
     }
@@ -133,11 +137,13 @@ public class BackupJob extends AbstractJob {
     }
 
     public BackupJob(String label, long dbId, String dbName, List<TableRef> 
tableRefs, long timeoutMs,
-                     BackupContent content, Env env, long repoId) {
+                     BackupContent content, Env env, long repoId, long 
commitSeq) {
         super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
         this.tableRefs = tableRefs;
         this.state = BackupJobState.PENDING;
+        this.commitSeq = commitSeq;
         properties.put(BackupStmt.PROP_CONTENT, content.name());
+        properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq));
     }
 
     public BackupJobState getState() {
@@ -237,7 +243,7 @@ public class BackupJob extends AbstractJob {
             if (request.getTaskStatus().getStatusCode() == 
TStatusCode.TABLET_MISSING
                     && !tryNewTabletSnapshotTask(task)) {
                 status = new Status(ErrCode.NOT_FOUND,
-                        "make snapshot failed, failed to ge tablet, table will 
be droped or truncated");
+                        "make snapshot failed, failed to ge tablet, table will 
be dropped or truncated");
                 cancelInternal();
             }
 
@@ -379,6 +385,14 @@ public class BackupJob extends AbstractJob {
 
         LOG.debug("run backup job: {}", this);
 
+        if (state == BackupJobState.PENDING) {
+            String pausedLabel = 
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", "");
+            if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) {
+                LOG.info("pause pending backup job by debug point: {}", this);
+                return;
+            }
+        }
+
         // run job base on current state
         switch (state) {
             case PENDING:
@@ -526,7 +540,7 @@ public class BackupJob extends AbstractJob {
 
     private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, 
OlapTable olapTable,
             TableRef backupTableRef, AgentBatchTask batchTask) {
-        // Add barrier editolog for barrier commit seq
+        // Add barrier editlog for barrier commit seq
         long dbId = db.getId();
         String dbName = db.getFullName();
         long tableId = olapTable.getId();
@@ -656,13 +670,11 @@ public class BackupJob extends AbstractJob {
 
     private void waitingAllSnapshotsFinished() {
         if (unfinishedTaskIds.isEmpty()) {
-
             if (env.getEditLog().exceedMaxJournalSize(this)) {
                 status = new Status(ErrCode.COMMON_ERROR, "backupJob is too 
large ");
                 return;
             }
 
-
             snapshotFinishedTime = System.currentTimeMillis();
             state = BackupJobState.UPLOAD_SNAPSHOT;
 
@@ -972,6 +984,10 @@ public class BackupJob extends AbstractJob {
         return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
     }
 
+    public long getCommitSeq() {
+        return commitSeq;
+    }
+
     // read meta and job info bytes from disk, and return the snapshot
     public synchronized Snapshot getSnapshot() {
         if (state != BackupJobState.FINISHED || repoId != 
Repository.KEEP_ON_LOCAL_REPO_ID) {
@@ -981,7 +997,7 @@ public class BackupJob extends AbstractJob {
         // Avoid loading expired meta.
         long expiredAt = createTime + timeoutMs;
         if (System.currentTimeMillis() >= expiredAt) {
-            return new Snapshot(label, new byte[0], new byte[0], expiredAt);
+            return new Snapshot(label, new byte[0], new byte[0], expiredAt, 
commitSeq);
         }
 
         try {
@@ -989,7 +1005,7 @@ public class BackupJob extends AbstractJob {
             File jobInfoFile = new File(localJobInfoFilePath);
             byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
             byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
-            return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt);
+            return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, 
commitSeq);
         } catch (IOException e) {
             LOG.warn("failed to load meta info and job info file, meta info 
file {}, job info file {}: ",
                     localMetaInfoFilePath, localJobInfoFilePath, e);
@@ -1182,6 +1198,10 @@ public class BackupJob extends AbstractJob {
             String value = Text.readString(in);
             properties.put(key, value);
         }
+
+        if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
+            commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index c4c93548177..a9f734dbc99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -34,14 +34,18 @@ public class Snapshot {
     @SerializedName(value = "expired_at")
     private long expiredAt = 0;
 
+    @SerializedName(value = "commitSeq")
+    private long commitSeq = 0;
+
     public Snapshot() {
     }
 
-    public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt) 
{
+    public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, 
long commitSeq) {
         this.label = label;
         this.meta = meta;
         this.jobInfo = jobInfo;
         this.expiredAt = expiredAt;
+        this.commitSeq = commitSeq;
     }
 
     public byte[] getMeta() {
@@ -60,6 +64,10 @@ public class Snapshot {
         return System.currentTimeMillis() > expiredAt;
     }
 
+    public long getCommitSeq() {
+        return commitSeq;
+    }
+
     public String toJson() {
         return GsonUtils.GSON.toJson(this);
     }
@@ -71,6 +79,7 @@ public class Snapshot {
                 + ", meta=" + meta
                 + ", jobInfo=" + jobInfo
                 + ", expiredAt=" + expiredAt
+                + ", commitSeq=" + commitSeq
                 + '}';
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index b2181093832..b81068d475c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -518,6 +518,10 @@ public class Database extends MetaObject implements 
Writable, DatabaseIf<Table>
         }
     }
 
+    public Set<String> getTableNames() {
+        return new HashSet<>(this.nameToTable.keySet());
+    }
+
     /**
      * This is a thread-safe method when nameToTable is a concurrent hash map
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
index 2b4245b290c..4a9ce13e03b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -46,6 +46,11 @@ public class BarrierLog implements Writable {
     public BarrierLog() {
     }
 
+    public BarrierLog(long dbId, String dbName) {
+        this.dbId = dbId;
+        this.dbName = dbName;
+    }
+
     public BarrierLog(long dbId, String dbName, long tableId, String 
tableName) {
         this.dbId = dbId;
         this.dbName = dbName;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 8c521cc73b0..e67534d302b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2820,9 +2820,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             byte[] meta = snapshot.getMeta();
             byte[] jobInfo = snapshot.getJobInfo();
             long expiredAt = snapshot.getExpiredAt();
+            long commitSeq = snapshot.getCommitSeq();
 
-            LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}, expired at: {}",
-                    label, meta.length, jobInfo.length, expiredAt);
+            LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}, "
+                    + "expired at: {}, commit seq: {}", label, meta.length, 
jobInfo.length, expiredAt, commitSeq);
             if (request.isEnableCompress()) {
                 meta = GZIPUtils.compress(meta);
                 jobInfo = GZIPUtils.compress(jobInfo);
@@ -2835,6 +2836,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.setMeta(meta);
             result.setJobInfo(jobInfo);
             result.setExpiredAt(expiredAt);
+            result.setCommitSeq(commitSeq);
         }
 
         return result;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
index 4e0eecda1fa..dd9cb075293 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -212,7 +212,7 @@ public class BackupJobTest {
                 new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, 
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
                 null));
         job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
-                env, repo.getId());
+                env, repo.getId(), 0);
     }
 
     @Test
@@ -348,7 +348,7 @@ public class BackupJobTest {
                 new TableRef(new 
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, 
"unknown_tbl"),
                         null));
         job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
-                env, repo.getId());
+                env, repo.getId(), 0);
         job.run();
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, 
job.getStatus().getErrCode());
         Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index a75275bd917..b58f8a42e89 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1204,6 +1204,7 @@ struct TGetSnapshotResult {
     4: optional Types.TNetworkAddress master_address
     5: optional bool compressed;
     6: optional i64 expiredAt;  // in millis
+    7: optional i64 commit_seq;
 }
 
 struct TTableRef {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to