This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5dda61b410c [fix](backup) Load backup meta and job info bytes from
disk #43276 (#43519)
5dda61b410c is described below
commit 5dda61b410cdde1dc7495dee9a95eeafe6f6fb25
Author: walter <[email protected]>
AuthorDate: Mon Nov 11 14:08:35 2024 +0800
[fix](backup) Load backup meta and job info bytes from disk #43276 (#43519)
cherry pick from #43276
---
.../java/org/apache/doris/backup/AbstractJob.java | 2 +
.../org/apache/doris/backup/BackupHandler.java | 50 +++++++++++++++++++---
.../java/org/apache/doris/backup/BackupJob.java | 45 ++++++++++++-------
.../java/org/apache/doris/backup/RestoreJob.java | 5 +++
.../apache/doris/service/FrontendServiceImpl.java | 7 ++-
5 files changed, 85 insertions(+), 24 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
index 4e2c3fd1990..f22598dd86b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
@@ -155,6 +155,8 @@ public abstract class AbstractJob implements Writable {
public abstract boolean isCancelled();
+ public abstract boolean isFinished();
+
public abstract Status updateRepo(Repository repo);
public static AbstractJob read(DataInput in) throws IOException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index d9b7659cfc1..49190acce1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -109,10 +109,10 @@ public class BackupHandler extends MasterDaemon
implements Writable {
private Env env;
- // map to store backup info, key is label name, value is Pair<meta, info>,
meta && info is bytes
- // this map not present in persist && only in fe master memory
+ // map to store backup info, key is label name, value is the BackupJob
+ // this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
- private final Map<String, Snapshot> localSnapshots = new HashMap<>();
+ private final Map<String, BackupJob> localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
public BackupHandler() {
@@ -167,6 +167,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
return false;
}
}
+
isInit = true;
return true;
}
@@ -544,11 +545,15 @@ public class BackupHandler extends MasterDaemon
implements Writable {
return;
}
+ List<String> removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque<AbstractJob> jobs =
dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
- jobs.removeFirst();
+ AbstractJob removedJob = jobs.removeFirst();
+ if (removedJob instanceof BackupJob && ((BackupJob)
removedJob).isLocalSnapshot()) {
+ removedLabels.add(removedJob.getLabel());
+ }
}
AbstractJob lastJob = jobs.peekLast();
@@ -561,6 +566,17 @@ public class BackupHandler extends MasterDaemon implements
Writable {
} finally {
jobLock.unlock();
}
+
+ if (job.isFinished() && job instanceof BackupJob) {
+ // Save snapshot to local repo, when reload backupHandler from
image.
+ BackupJob backupJob = (BackupJob) job;
+ if (backupJob.isLocalSnapshot()) {
+ addSnapshot(backupJob.getLabel(), backupJob);
+ }
+ }
+ for (String label : removedLabels) {
+ removeSnapshot(label);
+ }
}
private List<AbstractJob> getAllCurrentJobs() {
@@ -799,22 +815,42 @@ public class BackupHandler extends MasterDaemon
implements Writable {
return false;
}
- public void addSnapshot(String labelName, Snapshot snapshot) {
+ public void addSnapshot(String labelName, BackupJob backupJob) {
+ assert backupJob.isFinished();
+
+ LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
- localSnapshots.put(labelName, snapshot);
+ localSnapshots.put(labelName, backupJob);
+ } finally {
+ localSnapshotsLock.writeLock().unlock();
+ }
+ }
+
+ public void removeSnapshot(String labelName) {
+ LOG.info("remove snapshot {} from local repo", labelName);
+ localSnapshotsLock.writeLock().lock();
+ try {
+ localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}
public Snapshot getSnapshot(String labelName) {
+ BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
- return localSnapshots.get(labelName);
+ backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}
+
+ if (backupJob == null) {
+ return null;
+ }
+
+ return backupJob.getSnapshot();
}
public static BackupHandler read(DataInput in) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index b24fb9fe7fd..9e932d6f8fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -123,9 +123,6 @@ public class BackupJob extends AbstractJob {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();
- private byte[] metaInfoBytes = null;
- private byte[] jobInfoBytes = null;
-
public BackupJob() {
super(JobType.BACKUP);
}
@@ -337,11 +334,7 @@ public class BackupJob extends AbstractJob {
@Override
public synchronized void replayRun() {
- LOG.info("replay run backup job: {}", this);
- if (state == BackupJobState.FINISHED && repoId ==
Repository.KEEP_ON_LOCAL_REPO_ID) {
- Snapshot snapshot = new Snapshot(label, metaInfoBytes,
jobInfoBytes);
- env.getBackupHandler().addSnapshot(label, snapshot);
- }
+ // nothing to do
}
@Override
@@ -359,6 +352,11 @@ public class BackupJob extends AbstractJob {
return state == BackupJobState.CANCELLED;
}
+ @Override
+ public boolean isFinished() {
+ return state == BackupJobState.FINISHED;
+ }
+
@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
@@ -844,8 +842,6 @@ public class BackupJob extends AbstractJob {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
- // read meta info to metaInfoBytes
- metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
// 3. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
@@ -872,8 +868,6 @@ public class BackupJob extends AbstractJob {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
- // read job info to jobInfoBytes
- jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta
info and job info file: " + e.getMessage());
return;
@@ -927,7 +921,6 @@ public class BackupJob extends AbstractJob {
}
}
-
finishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHED;
@@ -936,8 +929,7 @@ public class BackupJob extends AbstractJob {
LOG.info("job is finished. {}", this);
if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
- Snapshot snapshot = new Snapshot(label, metaInfoBytes,
jobInfoBytes);
- env.getBackupHandler().addSnapshot(label, snapshot);
+ env.getBackupHandler().addSnapshot(label, this);
return;
}
}
@@ -1030,6 +1022,29 @@ public class BackupJob extends AbstractJob {
LOG.info("finished to cancel backup job. current state: {}. {}",
curState.name(), this);
}
+ public boolean isLocalSnapshot() {
+ return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
+ }
+
+ // read meta and job info bytes from disk, and return the snapshot
+ public synchronized Snapshot getSnapshot() {
+ if (state != BackupJobState.FINISHED || repoId !=
Repository.KEEP_ON_LOCAL_REPO_ID) {
+ return null;
+ }
+
+ try {
+ File metaInfoFile = new File(localMetaInfoFilePath);
+ File jobInfoFile = new File(localJobInfoFilePath);
+ byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
+ byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
+ return new Snapshot(label, metaInfoBytes, jobInfoBytes);
+ } catch (IOException e) {
+ LOG.warn("failed to load meta info and job info file, meta info
file {}, job info file {}: ",
+ localMetaInfoFilePath, localJobInfoFilePath, e);
+ return null;
+ }
+ }
+
public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 5a3b569a7cb..1f13a2970d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -379,6 +379,11 @@ public class RestoreJob extends AbstractJob {
return state == RestoreJobState.CANCELLED;
}
+ @Override
+ public boolean isFinished() {
+ return state == RestoreJobState.FINISHED;
+ }
+
@Override
public synchronized Status updateRepo(Repository repo) {
this.repo = repo;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d37e54deba8..d1aefdec5a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3018,15 +3018,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
// Step 3: get snapshot
+ String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
- Snapshot snapshot =
Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
+ Snapshot snapshot =
Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
- result.getStatus().addToErrorMsgs("snapshot not exist");
+ result.getStatus().addToErrorMsgs(String.format("snapshot %s not
exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
+ LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info
size: {}",
+ label, snapshot.getMeta().length,
snapshot.getJobInfo().length);
}
return result;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]