This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d252fca2b6b branch-3.0: [improve](backup) Add the commit seq at the 
backup job level. #44049 (#44065)
d252fca2b6b is described below

commit d252fca2b6b0974965f7bb6b36ae379fb4f46b92
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Nov 17 21:07:06 2024 +0800

    branch-3.0: [improve](backup) Add the commit seq at the backup job level. 
#44049 (#44065)
    
    Cherry-picked from #44049
    
    Co-authored-by: walter <[email protected]>
---
 .../org/apache/doris/backup/BackupHandler.java     | 43 ++++++++++++++++------
 .../java/org/apache/doris/backup/BackupJob.java    | 41 +++++++++++++++++----
 .../java/org/apache/doris/backup/Snapshot.java     | 11 +++++-
 .../java/org/apache/doris/catalog/Database.java    |  4 ++
 .../java/org/apache/doris/persist/BarrierLog.java  |  5 +++
 .../apache/doris/service/FrontendServiceImpl.java  |  6 ++-
 .../org/apache/doris/backup/BackupJobTest.java     |  7 ++--
 gensrc/thrift/FrontendService.thrift               |  1 +
 8 files changed, 93 insertions(+), 25 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d585176e3d4..7cad2f98500 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -52,6 +52,7 @@ import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.AzureFileSystem;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.SnapshotTask;
@@ -370,20 +371,32 @@ public class BackupHandler extends MasterDaemon 
implements Writable {
                     + " is read only");
         }
 
-        // Determine the tables to be backed up
+        long commitSeq = 0;
         Set<String> tableNames = Sets.newHashSet();
         AbstractBackupTableRefClause abstractBackupTableRefClause = 
stmt.getAbstractBackupTableRefClause();
-        if (abstractBackupTableRefClause == null) {
-            tableNames = db.getTableNamesWithLock();
-        } else if (abstractBackupTableRefClause.isExclude()) {
-            tableNames = db.getTableNamesWithLock();
-            for (TableRef tableRef : 
abstractBackupTableRefClause.getTableRefList()) {
-                if (!tableNames.remove(tableRef.getName().getTbl())) {
-                    LOG.info("exclude table " + tableRef.getName().getTbl()
-                            + " of backup stmt is not exists in db " + 
db.getFullName());
+
+        // Obtain the snapshot commit seq, any creating table binlog will be 
visible.
+        db.readLock();
+        try {
+            BarrierLog log = new BarrierLog(db.getId(), db.getFullName());
+            commitSeq = env.getEditLog().logBarrier(log);
+
+            // Determine the tables to be backed up
+            if (abstractBackupTableRefClause == null) {
+                tableNames = db.getTableNames();
+            } else if (abstractBackupTableRefClause.isExclude()) {
+                tableNames = db.getTableNames();
+                for (TableRef tableRef : 
abstractBackupTableRefClause.getTableRefList()) {
+                    if (!tableNames.remove(tableRef.getName().getTbl())) {
+                        LOG.info("exclude table " + tableRef.getName().getTbl()
+                                + " of backup stmt is not exists in db " + 
db.getFullName());
+                    }
                 }
             }
+        } finally {
+            db.readUnlock();
         }
+
         List<TableRef> tblRefs = Lists.newArrayList();
         if (abstractBackupTableRefClause != null && 
!abstractBackupTableRefClause.isExclude()) {
             tblRefs = abstractBackupTableRefClause.getTableRefList();
@@ -401,6 +414,14 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         for (TableRef tblRef : tblRefs) {
             String tblName = tblRef.getName().getTbl();
             Table tbl = db.getTableOrDdlException(tblName);
+
+            // filter the table types which are not supported by local backup.
+            if (repository == null && tbl.getType() != TableType.OLAP
+                    && tbl.getType() != TableType.VIEW && tbl.getType() != 
TableType.MATERIALIZED_VIEW) {
+                tblRefsNotSupport.add(tblRef);
+                continue;
+            }
+
             if (tbl.getType() == TableType.VIEW || tbl.getType() == 
TableType.ODBC
                     || tbl.getType() == TableType.MATERIALIZED_VIEW) {
                 continue;
@@ -448,7 +469,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         tblRefs.removeAll(tblRefsNotSupport);
 
         // Check if label already be used
-        long repoId = -1;
+        long repoId = Repository.KEEP_ON_LOCAL_REPO_ID;
         if (repository != null) {
             List<String> existSnapshotNames = Lists.newArrayList();
             Status st = repository.listSnapshots(existSnapshotNames);
@@ -470,7 +491,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
         // Create a backup job
         BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
                 ClusterNamespace.getNameFromFullName(db.getFullName()),
-                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
+                tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId, 
commitSeq);
         // write log
         env.getEditLog().logBackupJob(backupJob);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 8d71d2808ee..478e8902d7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -37,9 +37,11 @@ import org.apache.doris.catalog.View;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.property.S3ClientBEProperties;
 import org.apache.doris.persist.BarrierLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
@@ -80,9 +82,10 @@ import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 
 
-public class BackupJob extends AbstractJob {
+public class BackupJob extends AbstractJob implements GsonPostProcessable {
     private static final Logger LOG = LogManager.getLogger(BackupJob.class);
     private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
+    private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq";
 
     public enum BackupJobState {
         PENDING, // Job is newly created. Send snapshot tasks and save copied 
meta info, then transfer to SNAPSHOTING
@@ -130,6 +133,8 @@ public class BackupJob extends AbstractJob {
     @SerializedName("prop")
     private Map<String, String> properties = Maps.newHashMap();
 
+    private long commitSeq = 0;
+
     public BackupJob() {
         super(JobType.BACKUP);
     }
@@ -140,11 +145,13 @@ public class BackupJob extends AbstractJob {
     }
 
     public BackupJob(String label, long dbId, String dbName, List<TableRef> 
tableRefs, long timeoutMs,
-                     BackupContent content, Env env, long repoId) {
+                     BackupContent content, Env env, long repoId, long 
commitSeq) {
         super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
         this.tableRefs = tableRefs;
         this.state = BackupJobState.PENDING;
+        this.commitSeq = commitSeq;
         properties.put(BackupStmt.PROP_CONTENT, content.name());
+        properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq));
     }
 
     public BackupJobState getState() {
@@ -244,7 +251,7 @@ public class BackupJob extends AbstractJob {
             if (request.getTaskStatus().getStatusCode() == 
TStatusCode.TABLET_MISSING
                     && !tryNewTabletSnapshotTask(task)) {
                 status = new Status(ErrCode.NOT_FOUND,
-                        "make snapshot failed, failed to ge tablet, table will 
be droped or truncated");
+                        "make snapshot failed, failed to ge tablet, table will 
be dropped or truncated");
                 cancelInternal();
             }
 
@@ -414,6 +421,14 @@ public class BackupJob extends AbstractJob {
             LOG.debug("run backup job: {}", this);
         }
 
+        if (state == BackupJobState.PENDING) {
+            String pausedLabel = 
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", "");
+            if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) {
+                LOG.info("pause pending backup job by debug point: {}", this);
+                return;
+            }
+        }
+
         // run job base on current state
         switch (state) {
             case PENDING:
@@ -568,7 +583,7 @@ public class BackupJob extends AbstractJob {
 
     private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, 
OlapTable olapTable,
             TableRef backupTableRef, AgentBatchTask batchTask) {
-        // Add barrier editolog for barrier commit seq
+        // Add barrier editlog for barrier commit seq
         long dbId = db.getId();
         String dbName = db.getFullName();
         long tableId = olapTable.getId();
@@ -698,13 +713,11 @@ public class BackupJob extends AbstractJob {
 
     private void waitingAllSnapshotsFinished() {
         if (unfinishedTaskIds.isEmpty()) {
-
             if (env.getEditLog().exceedMaxJournalSize(this)) {
                 status = new Status(ErrCode.COMMON_ERROR, "backupJob is too 
large ");
                 return;
             }
 
-
             snapshotFinishedTime = System.currentTimeMillis();
             state = BackupJobState.UPLOAD_SNAPSHOT;
 
@@ -1021,6 +1034,10 @@ public class BackupJob extends AbstractJob {
         return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
     }
 
+    public long getCommitSeq() {
+        return commitSeq;
+    }
+
     // read meta and job info bytes from disk, and return the snapshot
     public synchronized Snapshot getSnapshot() {
         if (state != BackupJobState.FINISHED || repoId != 
Repository.KEEP_ON_LOCAL_REPO_ID) {
@@ -1030,7 +1047,7 @@ public class BackupJob extends AbstractJob {
         // Avoid loading expired meta.
         long expiredAt = createTime + timeoutMs;
         if (System.currentTimeMillis() >= expiredAt) {
-            return new Snapshot(label, new byte[0], new byte[0], expiredAt);
+            return new Snapshot(label, new byte[0], new byte[0], expiredAt, 
commitSeq);
         }
 
         try {
@@ -1038,7 +1055,7 @@ public class BackupJob extends AbstractJob {
             File jobInfoFile = new File(localJobInfoFilePath);
             byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
             byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
-            return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt);
+            return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, 
commitSeq);
         } catch (IOException e) {
             LOG.warn("failed to load meta info and job info file, meta info 
file {}, job info file {}: ",
                     localMetaInfoFilePath, localJobInfoFilePath, e);
@@ -1151,6 +1168,14 @@ public class BackupJob extends AbstractJob {
             String value = Text.readString(in);
             properties.put(key, value);
         }
+
+        gsonPostProcess();
+    }
+
+    public void gsonPostProcess() throws IOException {
+        if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
+            commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index c4c93548177..a9f734dbc99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -34,14 +34,18 @@ public class Snapshot {
     @SerializedName(value = "expired_at")
     private long expiredAt = 0;
 
+    @SerializedName(value = "commitSeq")
+    private long commitSeq = 0;
+
     public Snapshot() {
     }
 
-    public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt) 
{
+    public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, 
long commitSeq) {
         this.label = label;
         this.meta = meta;
         this.jobInfo = jobInfo;
         this.expiredAt = expiredAt;
+        this.commitSeq = commitSeq;
     }
 
     public byte[] getMeta() {
@@ -60,6 +64,10 @@ public class Snapshot {
         return System.currentTimeMillis() > expiredAt;
     }
 
+    public long getCommitSeq() {
+        return commitSeq;
+    }
+
     public String toJson() {
         return GsonUtils.GSON.toJson(this);
     }
@@ -71,6 +79,7 @@ public class Snapshot {
                 + ", meta=" + meta
                 + ", jobInfo=" + jobInfo
                 + ", expiredAt=" + expiredAt
+                + ", commitSeq=" + commitSeq
                 + '}';
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 11a60360426..2cdd46e6227 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -538,6 +538,10 @@ public class Database extends MetaObject implements 
Writable, DatabaseIf<Table>,
         }
     }
 
+    public Set<String> getTableNames() {
+        return new HashSet<>(this.nameToTable.keySet());
+    }
+
     /**
      * This is a thread-safe method when nameToTable is a concurrent hash map
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
index 2b4245b290c..4a9ce13e03b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -46,6 +46,11 @@ public class BarrierLog implements Writable {
     public BarrierLog() {
     }
 
+    public BarrierLog(long dbId, String dbName) {
+        this.dbId = dbId;
+        this.dbName = dbName;
+    }
+
     public BarrierLog(long dbId, String dbName, long tableId, String 
tableName) {
         this.dbId = dbId;
         this.dbName = dbName;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2e7980d80d4..e35fd2dc852 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2952,9 +2952,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             byte[] meta = snapshot.getMeta();
             byte[] jobInfo = snapshot.getJobInfo();
             long expiredAt = snapshot.getExpiredAt();
+            long commitSeq = snapshot.getCommitSeq();
 
-            LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}, expired at: {}",
-                    label, meta.length, jobInfo.length, expiredAt);
+            LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}, "
+                    + "expired at: {}, commit seq: {}", label, meta.length, 
jobInfo.length, expiredAt, commitSeq);
             if (request.isEnableCompress()) {
                 meta = GZIPUtils.compress(meta);
                 jobInfo = GZIPUtils.compress(jobInfo);
@@ -2967,6 +2968,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.setMeta(meta);
             result.setJobInfo(jobInfo);
             result.setExpiredAt(expiredAt);
+            result.setCommitSeq(commitSeq);
         }
 
         return result;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
index 68f02e8eee8..1e22a6cb5af 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -215,7 +215,7 @@ public class BackupJobTest {
                 new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, 
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
                 null));
         job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
-                env, repo.getId());
+                env, repo.getId(), 0);
     }
 
     @Test
@@ -351,7 +351,7 @@ public class BackupJobTest {
                 new TableRef(new 
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, 
"unknown_tbl"),
                         null));
         job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
-                env, repo.getId());
+                env, repo.getId(), 0);
         job.run();
         Assert.assertEquals(Status.ErrCode.NOT_FOUND, 
job.getStatus().getErrCode());
         Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
@@ -368,7 +368,7 @@ public class BackupJobTest {
                 new TableRef(new 
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, 
UnitTestUtil.TABLE_NAME),
                         null));
         job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 
13600 * 1000, BackupStmt.BackupContent.ALL,
-            env, repo.getId());
+            env, repo.getId(), 123);
 
         job.write(out);
         out.flush();
@@ -383,6 +383,7 @@ public class BackupJobTest {
         Assert.assertEquals(job.getDbId(), job2.getDbId());
         Assert.assertEquals(job.getCreateTime(), job2.getCreateTime());
         Assert.assertEquals(job.getType(), job2.getType());
+        Assert.assertEquals(job.getCommitSeq(), job2.getCommitSeq());
 
         // 3. delete files
         in.close();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 181b632e43f..daf02cdf46b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1363,6 +1363,7 @@ struct TGetSnapshotResult {
     4: optional Types.TNetworkAddress master_address
     5: optional bool compressed;
     6: optional i64 expiredAt;  // in millis
+    7: optional i64 commit_seq;
 }
 
 struct TTableRef {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to