This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 80147f86d26 [fix](backup) Support backup meta/job info exceeds 2GB
(#55608)
80147f86d26 is described below
commit 80147f86d26ad26686b1c3a0ede2d6d2e903bbbd
Author: walter <[email protected]>
AuthorDate: Sat Sep 6 17:06:20 2025 +0800
[fix](backup) Support backup meta/job info exceeds 2GB (#55608)
---
.../java/org/apache/doris/common/GZIPUtils.java | 21 ++++++++
.../org/apache/doris/backup/BackupHandler.java | 11 +---
.../java/org/apache/doris/backup/BackupJob.java | 16 ++----
.../org/apache/doris/backup/BackupJobInfo.java | 8 +++
.../java/org/apache/doris/backup/BackupMeta.java | 2 +-
.../java/org/apache/doris/backup/Snapshot.java | 62 ++++++++++++++++------
.../trees/plans/commands/RestoreCommand.java | 14 ++---
.../apache/doris/service/FrontendServiceImpl.java | 53 +++++++++++++-----
8 files changed, 130 insertions(+), 57 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
index 7408e2888cc..4500c76f638 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
@@ -21,7 +21,10 @@ import org.apache.commons.io.IOUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -39,10 +42,28 @@ public class GZIPUtils {
return bytesStream.toByteArray();
}
+ public static byte[] compress(File file) throws IOException {
+ ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+ try (FileInputStream fileInputStream = new FileInputStream(file);
+ GZIPOutputStream gzipStream = new
GZIPOutputStream(bytesStream)) {
+
+ byte[] buffer = new byte[8192]; // 8KB buffer
+ int bytesRead;
+ while ((bytesRead = fileInputStream.read(buffer)) != -1) {
+ gzipStream.write(buffer, 0, bytesRead);
+ }
+ }
+ return bytesStream.toByteArray();
+ }
+
public static byte[] decompress(byte[] data) throws IOException {
ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
return IOUtils.toByteArray(gzipStream);
}
}
+
+ public static InputStream lazyDecompress(byte[] data) throws IOException {
+ return new GZIPInputStream(new ByteArrayInputStream(data));
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 83cd86fee89..8518b6606ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -608,8 +608,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "not
supported now.");
}
if (command.isLocal()) {
- String jobInfoString = new String(command.getJobInfo());
- jobInfo = BackupJobInfo.genFromJson(jobInfoString);
+ jobInfo = command.getJobInfo();
if (jobInfo.extraInfo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Invalid job extra info empty");
@@ -646,13 +645,7 @@ public class BackupHandler extends MasterDaemon implements
Writable {
metaVersion = jobInfo.metaVersion;
}
- BackupMeta backupMeta;
- try {
- backupMeta = BackupMeta.fromBytes(command.getMeta(),
metaVersion);
- } catch (IOException e) {
- LOG.warn("read backup meta failed, current meta version {}",
Env.getCurrentEnvJournalVersion(), e);
- throw new DdlException("read backup meta failed", e);
- }
+ BackupMeta backupMeta = command.getMeta();
String backupTimestamp = TimeUtils.longToTimeString(
jobInfo.getBackupTime(),
TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
restoreJob = new RestoreJob(command.getLabel(), backupTimestamp,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index a7bf13d0a3c..1ea7d6ee6a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -1163,20 +1163,12 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
// Avoid loading expired meta.
long expiredAt = createTime + timeoutMs;
if (System.currentTimeMillis() >= expiredAt) {
- return new Snapshot(label, new byte[0], new byte[0], expiredAt,
commitSeq);
+ return new Snapshot(label, null, null, expiredAt, commitSeq);
}
- try {
- File metaInfoFile = new File(localMetaInfoFilePath);
- File jobInfoFile = new File(localJobInfoFilePath);
- byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
- byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
- return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt,
commitSeq);
- } catch (IOException e) {
- LOG.warn("failed to load meta info and job info file, meta info
file {}, job info file {}: ",
- localMetaInfoFilePath, localJobInfoFilePath, e);
- return null;
- }
+ File metaInfoFile = new File(localMetaInfoFilePath);
+ File jobInfoFile = new File(localJobInfoFilePath);
+ return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt,
commitSeq);
}
public synchronized List<String> getInfo() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index ccf4941538a..643aead0446 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -53,6 +53,8 @@ import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -827,6 +829,12 @@ public class BackupJobInfo implements GsonPostProcessable {
return jobInfo;
}
+ public static BackupJobInfo fromInputStream(InputStream inputStream)
throws IOException {
+ try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+ return GsonUtils.GSON.fromJson(reader, BackupJobInfo.class);
+ }
+ }
+
public void writeToFile(File jobInfoFile) throws FileNotFoundException {
PrintWriter printWriter = new PrintWriter(jobInfoFile);
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index 41b48af6bcb..7d34e0ce5d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -101,7 +101,7 @@ public class BackupMeta implements Writable,
GsonPostProcessable {
return fromInputStream(new ByteArrayInputStream(bytes), metaVersion);
}
- protected static BackupMeta fromInputStream(InputStream stream, int
metaVersion) throws IOException {
+ public static BackupMeta fromInputStream(InputStream stream, int
metaVersion) throws IOException {
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(metaVersion);
metaContext.setThreadLocalInfo();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index a9f734dbc99..2fc3ca6d146 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -17,19 +17,22 @@
package org.apache.doris.backup;
-import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.common.GZIPUtils;
+import org.apache.doris.common.Pair;
import com.google.gson.annotations.SerializedName;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
public class Snapshot {
@SerializedName(value = "label")
private String label = null;
- @SerializedName(value = "meta")
- private byte[] meta = null;
+ private File meta = null;
- @SerializedName(value = "jobInfo")
- private byte[] jobInfo = null;
+ private File jobInfo = null;
@SerializedName(value = "expired_at")
private long expiredAt = 0;
@@ -40,7 +43,7 @@ public class Snapshot {
public Snapshot() {
}
- public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt,
long commitSeq) {
+ public Snapshot(String label, File meta, File jobInfo, long expiredAt,
long commitSeq) {
this.label = label;
this.meta = meta;
this.jobInfo = jobInfo;
@@ -48,12 +51,45 @@ public class Snapshot {
this.commitSeq = commitSeq;
}
- public byte[] getMeta() {
- return meta;
+ public static Pair<BackupMeta, BackupJobInfo> readFromBytes(byte[] meta,
byte[] jobInfo) throws IOException {
+ BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new
String(jobInfo));
+ BackupMeta backupMeta = BackupMeta.fromBytes(meta,
backupJobInfo.metaVersion);
+ return Pair.of(backupMeta, backupJobInfo);
+ }
+
+ public static Pair<BackupMeta, BackupJobInfo>
readFromCompressedBytes(byte[] meta, byte[] jobInfo)
+ throws IOException {
+ BackupJobInfo backupJobInfo =
BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo));
+ BackupMeta backupMeta =
BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta),
backupJobInfo.metaVersion);
+ return Pair.of(backupMeta, backupJobInfo);
+ }
+
+ public static boolean isCompressed(byte[] meta, byte[] jobInfo) {
+ return GZIPUtils.isGZIPCompressed(jobInfo) ||
GZIPUtils.isGZIPCompressed(meta);
+ }
+
+ public long getMetaSize() {
+ return meta != null ? meta.length() : 0;
+ }
+
+ public long getJobInfoSize() {
+ return jobInfo != null ? jobInfo.length() : 0;
}
- public byte[] getJobInfo() {
- return jobInfo;
+ public byte[] getCompressedMeta() throws IOException {
+ return GZIPUtils.compress(meta);
+ }
+
+ public byte[] getCompressedJobInfo() throws IOException {
+ return GZIPUtils.compress(jobInfo);
+ }
+
+ public byte[] getMeta() throws IOException {
+ return Files.readAllBytes(meta.toPath());
+ }
+
+ public byte[] getJobInfo() throws IOException {
+ return Files.readAllBytes(jobInfo.toPath());
}
public long getExpiredAt() {
@@ -68,16 +104,10 @@ public class Snapshot {
return commitSeq;
}
- public String toJson() {
- return GsonUtils.GSON.toJson(this);
- }
-
@Override
public String toString() {
return "Snapshot{"
+ "label='" + label + '\''
- + ", meta=" + meta
- + ", jobInfo=" + jobInfo
+ ", expiredAt=" + expiredAt
+ ", commitSeq=" + commitSeq
+ '}';
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
index c76368f4e64..673e8f1700b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
@@ -18,6 +18,8 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Repository;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
@@ -94,8 +96,8 @@ public class RestoreCommand extends Command implements
ForwardWithSync {
private final Map<String, String> properties;
private final boolean isExclude;
private long timeoutMs;
- private byte[] meta = null;
- private byte[] jobInfo = null;
+ private BackupMeta meta = null;
+ private BackupJobInfo jobInfo = null;
private String storageVaultName = null;
/**
@@ -410,19 +412,19 @@ public class RestoreCommand extends Command implements
ForwardWithSync {
return isLocal;
}
- public byte[] getMeta() {
+ public BackupMeta getMeta() {
return meta;
}
- public void setMeta(byte[] meta) {
+ public void setMeta(BackupMeta meta) {
this.meta = meta;
}
- public byte[] getJobInfo() {
+ public BackupJobInfo getJobInfo() {
return jobInfo;
}
- public void setJobInfo(byte[] jobInfo) {
+ public void setJobInfo(BackupJobInfo jobInfo) {
this.jobInfo = jobInfo;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 70234ae52ba..83c53c532dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -29,6 +29,8 @@ import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
@@ -62,7 +64,6 @@ import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
@@ -3125,24 +3126,38 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED);
result.getStatus().addToErrorMsgs(String.format("snapshot %s is
expired", label));
} else {
- byte[] meta = snapshot.getMeta();
- byte[] jobInfo = snapshot.getJobInfo();
+ long metaSize = snapshot.getMetaSize();
+ long jobInfoSize = snapshot.getJobInfoSize();
+ long snapshotSize = snapshot.getMetaSize() +
snapshot.getJobInfoSize();
+ if (metaSize + jobInfoSize >= Integer.MAX_VALUE &&
!request.isEnableCompress()) {
+ String msg = String.format(
+ "Snapshot %s is too large (%d bytes > 2GB). Please
enable compression to continue.",
+ label, snapshotSize);
+ LOG.warn("get snapshot failed: {}", msg);
+ result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
+ result.getStatus().addToErrorMsgs(msg);
+ return result;
+ }
+
long expiredAt = snapshot.getExpiredAt();
long commitSeq = snapshot.getCommitSeq();
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info
size: {}, "
- + "expired at: {}, commit seq: {}", label, meta.length,
jobInfo.length, expiredAt, commitSeq);
+ + "expired at: {}, commit seq: {}", label, metaSize,
jobInfoSize, expiredAt, commitSeq);
if (request.isEnableCompress()) {
- meta = GZIPUtils.compress(meta);
- jobInfo = GZIPUtils.compress(jobInfo);
+ byte[] meta = snapshot.getCompressedMeta();
+ byte[] jobInfo = snapshot.getCompressedJobInfo();
+ result.setMeta(meta);
+ result.setJobInfo(jobInfo);
result.setCompressed(true);
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot info with compress, snapshot: {},
compressed meta "
+ "size {}, compressed job info size {}", label,
meta.length, jobInfo.length);
}
+ } else {
+ result.setMeta(snapshot.getMeta());
+ result.setJobInfo(snapshot.getJobInfo());
}
- result.setMeta(meta);
- result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
result.setCommitSeq(commitSeq);
}
@@ -3255,6 +3270,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
}
+ BackupMeta backupMeta;
+ BackupJobInfo backupJobInfo;
byte[] meta = request.getMeta();
byte[] jobInfo = request.getJobInfo();
if (Config.enable_restore_snapshot_rpc_compression &&
request.isCompressed()) {
@@ -3263,15 +3280,25 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
meta.length, jobInfo.length);
}
try {
- meta = GZIPUtils.decompress(meta);
- jobInfo = GZIPUtils.decompress(jobInfo);
+ Pair<BackupMeta, BackupJobInfo> pair =
Snapshot.readFromCompressedBytes(meta, jobInfo);
+ backupMeta = pair.first;
+ backupJobInfo = pair.second;
} catch (Exception e) {
LOG.warn("decompress meta and job info failed", e);
throw new UserException("decompress meta and job info failed",
e);
}
- } else if (GZIPUtils.isGZIPCompressed(jobInfo) ||
GZIPUtils.isGZIPCompressed(meta)) {
+ } else if (Snapshot.isCompressed(meta, jobInfo)) {
throw new UserException("The request is compressed, but the config
"
+ "`enable_restore_snapshot_rpc_compressed` is not
enabled.");
+ } else {
+ try {
+ Pair<BackupMeta, BackupJobInfo> pair =
Snapshot.readFromBytes(meta, jobInfo);
+ backupMeta = pair.first;
+ backupJobInfo = pair.second;
+ } catch (Exception e) {
+ LOG.warn("deserialize meta and job info failed", e);
+ throw new UserException("deserialize meta and job info
failed", e);
+ }
}
//instantiate RestoreCommand
@@ -3331,8 +3358,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
RestoreCommand restoreCommand = new RestoreCommand(labelNameInfo,
repoName, tableRefInfos, properties, false);
- restoreCommand.setMeta(meta);
- restoreCommand.setJobInfo(jobInfo);
+ restoreCommand.setMeta(backupMeta);
+ restoreCommand.setJobInfo(backupJobInfo);
restoreCommand.setIsBeingSynced();
LOG.debug("restore snapshot info, restoreCommand: {}", restoreCommand);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]