This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d252fca2b6b branch-3.0: [improve](backup) Add the commit seq at the
backup job level. #44049 (#44065)
d252fca2b6b is described below
commit d252fca2b6b0974965f7bb6b36ae379fb4f46b92
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Nov 17 21:07:06 2024 +0800
branch-3.0: [improve](backup) Add the commit seq at the backup job level.
#44049 (#44065)
Cherry-picked from #44049
Co-authored-by: walter <[email protected]>
---
.../org/apache/doris/backup/BackupHandler.java | 43 ++++++++++++++++------
.../java/org/apache/doris/backup/BackupJob.java | 41 +++++++++++++++++----
.../java/org/apache/doris/backup/Snapshot.java | 11 +++++-
.../java/org/apache/doris/catalog/Database.java | 4 ++
.../java/org/apache/doris/persist/BarrierLog.java | 5 +++
.../apache/doris/service/FrontendServiceImpl.java | 6 ++-
.../org/apache/doris/backup/BackupJobTest.java | 7 ++--
gensrc/thrift/FrontendService.thrift | 1 +
8 files changed, 93 insertions(+), 25 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d585176e3d4..7cad2f98500 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -52,6 +52,7 @@ import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
@@ -370,20 +371,32 @@ public class BackupHandler extends MasterDaemon
implements Writable {
+ " is read only");
}
- // Determine the tables to be backed up
+ long commitSeq = 0;
Set<String> tableNames = Sets.newHashSet();
AbstractBackupTableRefClause abstractBackupTableRefClause =
stmt.getAbstractBackupTableRefClause();
- if (abstractBackupTableRefClause == null) {
- tableNames = db.getTableNamesWithLock();
- } else if (abstractBackupTableRefClause.isExclude()) {
- tableNames = db.getTableNamesWithLock();
- for (TableRef tableRef :
abstractBackupTableRefClause.getTableRefList()) {
- if (!tableNames.remove(tableRef.getName().getTbl())) {
- LOG.info("exclude table " + tableRef.getName().getTbl()
- + " of backup stmt is not exists in db " +
db.getFullName());
+
+ // Obtain the snapshot commit seq, any creating table binlog will be
visible.
+ db.readLock();
+ try {
+ BarrierLog log = new BarrierLog(db.getId(), db.getFullName());
+ commitSeq = env.getEditLog().logBarrier(log);
+
+ // Determine the tables to be backed up
+ if (abstractBackupTableRefClause == null) {
+ tableNames = db.getTableNames();
+ } else if (abstractBackupTableRefClause.isExclude()) {
+ tableNames = db.getTableNames();
+ for (TableRef tableRef :
abstractBackupTableRefClause.getTableRefList()) {
+ if (!tableNames.remove(tableRef.getName().getTbl())) {
+ LOG.info("exclude table " + tableRef.getName().getTbl()
+ + " of backup stmt is not exists in db " +
db.getFullName());
+ }
}
}
+ } finally {
+ db.readUnlock();
}
+
List<TableRef> tblRefs = Lists.newArrayList();
if (abstractBackupTableRefClause != null &&
!abstractBackupTableRefClause.isExclude()) {
tblRefs = abstractBackupTableRefClause.getTableRefList();
@@ -401,6 +414,14 @@ public class BackupHandler extends MasterDaemon implements
Writable {
for (TableRef tblRef : tblRefs) {
String tblName = tblRef.getName().getTbl();
Table tbl = db.getTableOrDdlException(tblName);
+
+ // filter the table types which are not supported by local backup.
+ if (repository == null && tbl.getType() != TableType.OLAP
+ && tbl.getType() != TableType.VIEW && tbl.getType() !=
TableType.MATERIALIZED_VIEW) {
+ tblRefsNotSupport.add(tblRef);
+ continue;
+ }
+
if (tbl.getType() == TableType.VIEW || tbl.getType() ==
TableType.ODBC
|| tbl.getType() == TableType.MATERIALIZED_VIEW) {
continue;
@@ -448,7 +469,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
tblRefs.removeAll(tblRefsNotSupport);
// Check if label already be used
- long repoId = -1;
+ long repoId = Repository.KEEP_ON_LOCAL_REPO_ID;
if (repository != null) {
List<String> existSnapshotNames = Lists.newArrayList();
Status st = repository.listSnapshots(existSnapshotNames);
@@ -470,7 +491,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
// Create a backup job
BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
ClusterNamespace.getNameFromFullName(db.getFullName()),
- tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
+ tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId,
commitSeq);
// write log
env.getEditLog().logBackupJob(backupJob);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 8d71d2808ee..478e8902d7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -37,9 +37,11 @@ import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
@@ -80,9 +82,10 @@ import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
-public class BackupJob extends AbstractJob {
+public class BackupJob extends AbstractJob implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(BackupJob.class);
private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
+ private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq";
public enum BackupJobState {
PENDING, // Job is newly created. Send snapshot tasks and save copied
meta info, then transfer to SNAPSHOTING
@@ -130,6 +133,8 @@ public class BackupJob extends AbstractJob {
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();
+ private long commitSeq = 0;
+
public BackupJob() {
super(JobType.BACKUP);
}
@@ -140,11 +145,13 @@ public class BackupJob extends AbstractJob {
}
public BackupJob(String label, long dbId, String dbName, List<TableRef>
tableRefs, long timeoutMs,
- BackupContent content, Env env, long repoId) {
+ BackupContent content, Env env, long repoId, long
commitSeq) {
super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
this.tableRefs = tableRefs;
this.state = BackupJobState.PENDING;
+ this.commitSeq = commitSeq;
properties.put(BackupStmt.PROP_CONTENT, content.name());
+ properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq));
}
public BackupJobState getState() {
@@ -244,7 +251,7 @@ public class BackupJob extends AbstractJob {
if (request.getTaskStatus().getStatusCode() ==
TStatusCode.TABLET_MISSING
&& !tryNewTabletSnapshotTask(task)) {
status = new Status(ErrCode.NOT_FOUND,
- "make snapshot failed, failed to ge tablet, table will
be droped or truncated");
+ "make snapshot failed, failed to ge tablet, table will
be dropped or truncated");
cancelInternal();
}
@@ -414,6 +421,14 @@ public class BackupJob extends AbstractJob {
LOG.debug("run backup job: {}", this);
}
+ if (state == BackupJobState.PENDING) {
+ String pausedLabel =
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", "");
+ if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) {
+ LOG.info("pause pending backup job by debug point: {}", this);
+ return;
+ }
+ }
+
// run job base on current state
switch (state) {
case PENDING:
@@ -568,7 +583,7 @@ public class BackupJob extends AbstractJob {
private void prepareSnapshotTaskForOlapTableWithoutLock(Database db,
OlapTable olapTable,
TableRef backupTableRef, AgentBatchTask batchTask) {
- // Add barrier editolog for barrier commit seq
+ // Add barrier editlog for barrier commit seq
long dbId = db.getId();
String dbName = db.getFullName();
long tableId = olapTable.getId();
@@ -698,13 +713,11 @@ public class BackupJob extends AbstractJob {
private void waitingAllSnapshotsFinished() {
if (unfinishedTaskIds.isEmpty()) {
-
if (env.getEditLog().exceedMaxJournalSize(this)) {
status = new Status(ErrCode.COMMON_ERROR, "backupJob is too
large ");
return;
}
-
snapshotFinishedTime = System.currentTimeMillis();
state = BackupJobState.UPLOAD_SNAPSHOT;
@@ -1021,6 +1034,10 @@ public class BackupJob extends AbstractJob {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}
+ public long getCommitSeq() {
+ return commitSeq;
+ }
+
// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId !=
Repository.KEEP_ON_LOCAL_REPO_ID) {
@@ -1030,7 +1047,7 @@ public class BackupJob extends AbstractJob {
// Avoid loading expired meta.
long expiredAt = createTime + timeoutMs;
if (System.currentTimeMillis() >= expiredAt) {
- return new Snapshot(label, new byte[0], new byte[0], expiredAt);
+ return new Snapshot(label, new byte[0], new byte[0], expiredAt,
commitSeq);
}
try {
@@ -1038,7 +1055,7 @@ public class BackupJob extends AbstractJob {
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
- return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt);
+ return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt,
commitSeq);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info
file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
@@ -1151,6 +1168,14 @@ public class BackupJob extends AbstractJob {
String value = Text.readString(in);
properties.put(key, value);
}
+
+ gsonPostProcess();
+ }
+
+ public void gsonPostProcess() throws IOException {
+ if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
+ commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
+ }
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index c4c93548177..a9f734dbc99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -34,14 +34,18 @@ public class Snapshot {
@SerializedName(value = "expired_at")
private long expiredAt = 0;
+ @SerializedName(value = "commitSeq")
+ private long commitSeq = 0;
+
public Snapshot() {
}
- public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt)
{
+ public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt,
long commitSeq) {
this.label = label;
this.meta = meta;
this.jobInfo = jobInfo;
this.expiredAt = expiredAt;
+ this.commitSeq = commitSeq;
}
public byte[] getMeta() {
@@ -60,6 +64,10 @@ public class Snapshot {
return System.currentTimeMillis() > expiredAt;
}
+ public long getCommitSeq() {
+ return commitSeq;
+ }
+
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
@@ -71,6 +79,7 @@ public class Snapshot {
+ ", meta=" + meta
+ ", jobInfo=" + jobInfo
+ ", expiredAt=" + expiredAt
+ + ", commitSeq=" + commitSeq
+ '}';
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 11a60360426..2cdd46e6227 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -538,6 +538,10 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>,
}
}
+ public Set<String> getTableNames() {
+ return new HashSet<>(this.nameToTable.keySet());
+ }
+
/**
* This is a thread-safe method when nameToTable is a concurrent hash map
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
index 2b4245b290c..4a9ce13e03b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -46,6 +46,11 @@ public class BarrierLog implements Writable {
public BarrierLog() {
}
+ public BarrierLog(long dbId, String dbName) {
+ this.dbId = dbId;
+ this.dbName = dbName;
+ }
+
public BarrierLog(long dbId, String dbName, long tableId, String
tableName) {
this.dbId = dbId;
this.dbName = dbName;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 2e7980d80d4..e35fd2dc852 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2952,9 +2952,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
byte[] meta = snapshot.getMeta();
byte[] jobInfo = snapshot.getJobInfo();
long expiredAt = snapshot.getExpiredAt();
+ long commitSeq = snapshot.getCommitSeq();
- LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info
size: {}, expired at: {}",
- label, meta.length, jobInfo.length, expiredAt);
+ LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info
size: {}, "
+ + "expired at: {}, commit seq: {}", label, meta.length,
jobInfo.length, expiredAt, commitSeq);
if (request.isEnableCompress()) {
meta = GZIPUtils.compress(meta);
jobInfo = GZIPUtils.compress(jobInfo);
@@ -2967,6 +2968,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setMeta(meta);
result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
+ result.setCommitSeq(commitSeq);
}
return result;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
index 68f02e8eee8..1e22a6cb5af 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -215,7 +215,7 @@ public class BackupJobTest {
new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs,
13600 * 1000, BackupStmt.BackupContent.ALL,
- env, repo.getId());
+ env, repo.getId(), 0);
}
@Test
@@ -351,7 +351,7 @@ public class BackupJobTest {
new TableRef(new
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME,
"unknown_tbl"),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs,
13600 * 1000, BackupStmt.BackupContent.ALL,
- env, repo.getId());
+ env, repo.getId(), 0);
job.run();
Assert.assertEquals(Status.ErrCode.NOT_FOUND,
job.getStatus().getErrCode());
Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
@@ -368,7 +368,7 @@ public class BackupJobTest {
new TableRef(new
TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME,
UnitTestUtil.TABLE_NAME),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs,
13600 * 1000, BackupStmt.BackupContent.ALL,
- env, repo.getId());
+ env, repo.getId(), 123);
job.write(out);
out.flush();
@@ -383,6 +383,7 @@ public class BackupJobTest {
Assert.assertEquals(job.getDbId(), job2.getDbId());
Assert.assertEquals(job.getCreateTime(), job2.getCreateTime());
Assert.assertEquals(job.getType(), job2.getType());
+ Assert.assertEquals(job.getCommitSeq(), job2.getCommitSeq());
// 3. delete files
in.close();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 181b632e43f..daf02cdf46b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1363,6 +1363,7 @@ struct TGetSnapshotResult {
4: optional Types.TNetworkAddress master_address
5: optional bool compressed;
6: optional i64 expiredAt; // in millis
+ 7: optional i64 commit_seq;
}
struct TTableRef {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]