This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f9b70bd44f5 [fix](backup) Support backup meta/job info exceeds 2GB
(#55599)
f9b70bd44f5 is described below
commit f9b70bd44f5f86f990d1593aac89900a898d82cb
Author: walter <[email protected]>
AuthorDate: Tue Sep 9 10:50:54 2025 +0800
[fix](backup) Support backup meta/job info exceeds 2GB (#55599)
---
.../java/org/apache/doris/common/GZIPUtils.java | 21 ++++++++
.../org/apache/doris/analysis/RestoreStmt.java | 12 +++--
.../org/apache/doris/backup/BackupHandler.java | 12 +----
.../java/org/apache/doris/backup/BackupJob.java | 16 ++----
.../org/apache/doris/backup/BackupJobInfo.java | 9 ++++
.../java/org/apache/doris/backup/BackupMeta.java | 2 +-
.../java/org/apache/doris/backup/Snapshot.java | 62 ++++++++++++++++------
.../apache/doris/service/FrontendServiceImpl.java | 52 +++++++++++++-----
8 files changed, 130 insertions(+), 56 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
index 7408e2888cc..4500c76f638 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
@@ -21,7 +21,10 @@ import org.apache.commons.io.IOUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -39,10 +42,28 @@ public class GZIPUtils {
return bytesStream.toByteArray();
}
+ public static byte[] compress(File file) throws IOException {
+ ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+ try (FileInputStream fileInputStream = new FileInputStream(file);
+ GZIPOutputStream gzipStream = new
GZIPOutputStream(bytesStream)) {
+
+ byte[] buffer = new byte[8192]; // 8KB buffer
+ int bytesRead;
+ while ((bytesRead = fileInputStream.read(buffer)) != -1) {
+ gzipStream.write(buffer, 0, bytesRead);
+ }
+ }
+ return bytesStream.toByteArray();
+ }
+
public static byte[] decompress(byte[] data) throws IOException {
ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
return IOUtils.toByteArray(gzipStream);
}
}
+
+ public static InputStream lazyDecompress(byte[] data) throws IOException {
+ return new GZIPInputStream(new ByteArrayInputStream(data));
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index 72d53e8ff01..05b7e9fd733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -17,6 +17,8 @@
package org.apache.doris.analysis;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Repository;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.AnalysisException;
@@ -58,8 +60,8 @@ public class RestoreStmt extends AbstractBackupStmt {
private boolean isCleanPartitions = false;
private boolean isAtomicRestore = false;
private boolean isForceReplace = false;
- private byte[] meta = null;
- private byte[] jobInfo = null;
+ private BackupMeta meta = null;
+ private BackupJobInfo jobInfo = null;
public RestoreStmt(LabelName labelName, String repoName,
AbstractBackupTableRefClause restoreTableRefClause,
Map<String, String> properties) {
@@ -67,7 +69,7 @@ public class RestoreStmt extends AbstractBackupStmt {
}
public RestoreStmt(LabelName labelName, String repoName,
AbstractBackupTableRefClause restoreTableRefClause,
- Map<String, String> properties, byte[] meta, byte[] jobInfo) {
+ Map<String, String> properties, BackupMeta meta, BackupJobInfo
jobInfo) {
super(labelName, repoName, restoreTableRefClause, properties);
this.meta = meta;
this.jobInfo = jobInfo;
@@ -101,11 +103,11 @@ public class RestoreStmt extends AbstractBackupStmt {
return isLocal;
}
- public byte[] getMeta() {
+ public BackupMeta getMeta() {
return meta;
}
- public byte[] getJobInfo() {
+ public BackupJobInfo getJobInfo() {
return jobInfo;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index f5083f57f05..48f2a3cfaa6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -490,9 +490,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
private void restore(Repository repository, Database db, RestoreStmt stmt)
throws DdlException {
BackupJobInfo jobInfo;
if (stmt.isLocal()) {
- String jobInfoString = new String(stmt.getJobInfo());
- jobInfo = BackupJobInfo.genFromJson(jobInfoString);
-
+ jobInfo = stmt.getJobInfo();
if (jobInfo.extraInfo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Invalid job extra info empty");
}
@@ -528,13 +526,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
metaVersion = jobInfo.metaVersion;
}
- BackupMeta backupMeta;
- try {
- backupMeta = BackupMeta.fromBytes(stmt.getMeta(), metaVersion);
- } catch (IOException e) {
- LOG.warn("read backup meta failed, current meta version {}",
Env.getCurrentEnvJournalVersion(), e);
- throw new DdlException("read backup meta failed", e);
- }
+ BackupMeta backupMeta = stmt.getMeta();
String backupTimestamp = TimeUtils.longToTimeString(
jobInfo.getBackupTime(),
TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
restoreJob = new RestoreJob(stmt.getLabel(), backupTimestamp,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 89b0ddaa949..ca77bb3de23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -1060,20 +1060,12 @@ public class BackupJob extends AbstractJob {
// Avoid loading expired meta.
long expiredAt = createTime + timeoutMs;
if (System.currentTimeMillis() >= expiredAt) {
- return new Snapshot(label, new byte[0], new byte[0], expiredAt,
commitSeq);
+ return new Snapshot(label, null, null, expiredAt, commitSeq);
}
- try {
- File metaInfoFile = new File(localMetaInfoFilePath);
- File jobInfoFile = new File(localJobInfoFilePath);
- byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
- byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
- return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt,
commitSeq);
- } catch (IOException e) {
- LOG.warn("failed to load meta info and job info file, meta info
file {}, job info file {}: ",
- localMetaInfoFilePath, localJobInfoFilePath, e);
- return null;
- }
+ File metaInfoFile = new File(localMetaInfoFilePath);
+ File jobInfoFile = new File(localJobInfoFilePath);
+ return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt,
commitSeq);
}
public synchronized List<String> getInfo() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index aa127961e3d..8065058f020 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -54,6 +54,8 @@ import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -760,6 +762,13 @@ public class BackupJobInfo implements Writable {
return jobInfo;
}
+ public static BackupJobInfo fromInputStream(InputStream inputStream)
throws IOException {
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ BackupJobInfo jobInfo = GsonUtils.GSON.fromJson(reader,
BackupJobInfo.class);
+ jobInfo.initBackupJobInfoAfterDeserialize();
+ return jobInfo;
+ }
+ }
public void writeToFile(File jobInfoFile) throws FileNotFoundException {
PrintWriter printWriter = new PrintWriter(jobInfoFile);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index 6a973ea45a2..bef51db8d48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -92,7 +92,7 @@ public class BackupMeta implements Writable {
return fromInputStream(new ByteArrayInputStream(bytes), metaVersion);
}
- protected static BackupMeta fromInputStream(InputStream stream, int
metaVersion) throws IOException {
+ public static BackupMeta fromInputStream(InputStream stream, int
metaVersion) throws IOException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(metaVersion);
metaContext.setThreadLocalInfo();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index a9f734dbc99..2fc3ca6d146 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -17,19 +17,22 @@
package org.apache.doris.backup;
-import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.common.GZIPUtils;
+import org.apache.doris.common.Pair;
import com.google.gson.annotations.SerializedName;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
public class Snapshot {
@SerializedName(value = "label")
private String label = null;
- @SerializedName(value = "meta")
- private byte[] meta = null;
+ private File meta = null;
- @SerializedName(value = "jobInfo")
- private byte[] jobInfo = null;
+ private File jobInfo = null;
@SerializedName(value = "expired_at")
private long expiredAt = 0;
@@ -40,7 +43,7 @@ public class Snapshot {
public Snapshot() {
}
- public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt,
long commitSeq) {
+ public Snapshot(String label, File meta, File jobInfo, long expiredAt,
long commitSeq) {
this.label = label;
this.meta = meta;
this.jobInfo = jobInfo;
@@ -48,12 +51,45 @@ public class Snapshot {
this.commitSeq = commitSeq;
}
- public byte[] getMeta() {
- return meta;
+ public static Pair<BackupMeta, BackupJobInfo> readFromBytes(byte[] meta,
byte[] jobInfo) throws IOException {
+ BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new
String(jobInfo));
+ BackupMeta backupMeta = BackupMeta.fromBytes(meta,
backupJobInfo.metaVersion);
+ return Pair.of(backupMeta, backupJobInfo);
+ }
+
+ public static Pair<BackupMeta, BackupJobInfo>
readFromCompressedBytes(byte[] meta, byte[] jobInfo)
+ throws IOException {
+ BackupJobInfo backupJobInfo =
BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo));
+ BackupMeta backupMeta =
BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta),
backupJobInfo.metaVersion);
+ return Pair.of(backupMeta, backupJobInfo);
+ }
+
+ public static boolean isCompressed(byte[] meta, byte[] jobInfo) {
+ return GZIPUtils.isGZIPCompressed(jobInfo) ||
GZIPUtils.isGZIPCompressed(meta);
+ }
+
+ public long getMetaSize() {
+ return meta != null ? meta.length() : 0;
+ }
+
+ public long getJobInfoSize() {
+ return jobInfo != null ? jobInfo.length() : 0;
}
- public byte[] getJobInfo() {
- return jobInfo;
+ public byte[] getCompressedMeta() throws IOException {
+ return GZIPUtils.compress(meta);
+ }
+
+ public byte[] getCompressedJobInfo() throws IOException {
+ return GZIPUtils.compress(jobInfo);
+ }
+
+ public byte[] getMeta() throws IOException {
+ return Files.readAllBytes(meta.toPath());
+ }
+
+ public byte[] getJobInfo() throws IOException {
+ return Files.readAllBytes(jobInfo.toPath());
}
public long getExpiredAt() {
@@ -68,16 +104,10 @@ public class Snapshot {
return commitSeq;
}
- public String toJson() {
- return GsonUtils.GSON.toJson(this);
- }
-
@Override
public String toString() {
return "Snapshot{"
+ "label='" + label + '\''
- + ", meta=" + meta
- + ", jobInfo=" + jobInfo
+ ", expiredAt=" + expiredAt
+ ", commitSeq=" + commitSeq
+ '}';
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c83930dd8c4..409728f9ab8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -27,6 +27,8 @@ import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
@@ -53,7 +55,6 @@ import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
@@ -3067,24 +3068,38 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED);
result.getStatus().addToErrorMsgs(String.format("snapshot %s is
expired", label));
} else {
- byte[] meta = snapshot.getMeta();
- byte[] jobInfo = snapshot.getJobInfo();
+ long metaSize = snapshot.getMetaSize();
+ long jobInfoSize = snapshot.getJobInfoSize();
+ long snapshotSize = snapshot.getMetaSize() +
snapshot.getJobInfoSize();
+ if (metaSize + jobInfoSize >= Integer.MAX_VALUE &&
!request.isEnableCompress()) {
+ String msg = String.format(
+ "Snapshot %s is too large (%d bytes > 2GB). Please
enable compression to continue.",
+ label, snapshotSize);
+ LOG.warn("get snapshot failed: {}", msg);
+ result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
+ result.getStatus().addToErrorMsgs(msg);
+ return result;
+ }
+
long expiredAt = snapshot.getExpiredAt();
long commitSeq = snapshot.getCommitSeq();
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info
size: {}, "
- + "expired at: {}, commit seq: {}", label, meta.length,
jobInfo.length, expiredAt, commitSeq);
+ + "expired at: {}, commit seq: {}", label, metaSize,
jobInfoSize, expiredAt, commitSeq);
if (request.isEnableCompress()) {
- meta = GZIPUtils.compress(meta);
- jobInfo = GZIPUtils.compress(jobInfo);
+ byte[] meta = snapshot.getCompressedMeta();
+ byte[] jobInfo = snapshot.getCompressedJobInfo();
+ result.setMeta(meta);
+ result.setJobInfo(jobInfo);
result.setCompressed(true);
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot info with compress, snapshot: {},
compressed meta "
+ "size {}, compressed job info size {}", label,
meta.length, jobInfo.length);
}
+ } else {
+ result.setMeta(snapshot.getMeta());
+ result.setJobInfo(snapshot.getJobInfo());
}
- result.setMeta(meta);
- result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
result.setCommitSeq(commitSeq);
}
@@ -3197,6 +3212,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
}
+ BackupMeta backupMeta;
+ BackupJobInfo backupJobInfo;
byte[] meta = request.getMeta();
byte[] jobInfo = request.getJobInfo();
if (Config.enable_restore_snapshot_rpc_compression &&
request.isCompressed()) {
@@ -3205,18 +3222,29 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
meta.length, jobInfo.length);
}
try {
- meta = GZIPUtils.decompress(meta);
- jobInfo = GZIPUtils.decompress(jobInfo);
+ Pair<BackupMeta, BackupJobInfo> pair =
Snapshot.readFromCompressedBytes(meta, jobInfo);
+ backupMeta = pair.first;
+ backupJobInfo = pair.second;
} catch (Exception e) {
LOG.warn("decompress meta and job info failed", e);
throw new UserException("decompress meta and job info failed",
e);
}
- } else if (GZIPUtils.isGZIPCompressed(jobInfo) ||
GZIPUtils.isGZIPCompressed(meta)) {
+ } else if (Snapshot.isCompressed(meta, jobInfo)) {
throw new UserException("The request is compressed, but the config
"
+ "`enable_restore_snapshot_rpc_compressed` is not
enabled.");
+ } else {
+ try {
+ Pair<BackupMeta, BackupJobInfo> pair =
Snapshot.readFromBytes(meta, jobInfo);
+ backupMeta = pair.first;
+ backupJobInfo = pair.second;
+ } catch (Exception e) {
+ LOG.warn("deserialize meta and job info failed", e);
+ throw new UserException("deserialize meta and job info
failed", e);
+ }
}
- RestoreStmt restoreStmt = new RestoreStmt(label, repoName,
restoreTableRefClause, properties, meta, jobInfo);
+ RestoreStmt restoreStmt = new RestoreStmt(
+ label, repoName, restoreTableRefClause, properties,
backupMeta, backupJobInfo);
restoreStmt.setIsBeingSynced();
LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]