This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 80147f86d26 [fix](backup) Support backup meta/job info exceeds 2GB 
(#55608)
80147f86d26 is described below

commit 80147f86d26ad26686b1c3a0ede2d6d2e903bbbd
Author: walter <[email protected]>
AuthorDate: Sat Sep 6 17:06:20 2025 +0800

    [fix](backup) Support backup meta/job info exceeds 2GB (#55608)
---
 .../java/org/apache/doris/common/GZIPUtils.java    | 21 ++++++++
 .../org/apache/doris/backup/BackupHandler.java     | 11 +---
 .../java/org/apache/doris/backup/BackupJob.java    | 16 ++----
 .../org/apache/doris/backup/BackupJobInfo.java     |  8 +++
 .../java/org/apache/doris/backup/BackupMeta.java   |  2 +-
 .../java/org/apache/doris/backup/Snapshot.java     | 62 ++++++++++++++++------
 .../trees/plans/commands/RestoreCommand.java       | 14 ++---
 .../apache/doris/service/FrontendServiceImpl.java  | 53 +++++++++++++-----
 8 files changed, 130 insertions(+), 57 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
index 7408e2888cc..4500c76f638 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
@@ -21,7 +21,10 @@ import org.apache.commons.io.IOUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -39,10 +42,28 @@ public class GZIPUtils {
         return bytesStream.toByteArray();
     }
 
+    public static byte[] compress(File file) throws IOException {
+        ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+        try (FileInputStream fileInputStream = new FileInputStream(file);
+                GZIPOutputStream gzipStream = new 
GZIPOutputStream(bytesStream)) {
+
+            byte[] buffer = new byte[8192]; // 8KB buffer
+            int bytesRead;
+            while ((bytesRead = fileInputStream.read(buffer)) != -1) {
+                gzipStream.write(buffer, 0, bytesRead);
+            }
+        }
+        return bytesStream.toByteArray();
+    }
+
     public static byte[] decompress(byte[] data) throws IOException {
         ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
         try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
             return IOUtils.toByteArray(gzipStream);
         }
     }
+
+    public static InputStream lazyDecompress(byte[] data) throws IOException {
+        return new GZIPInputStream(new ByteArrayInputStream(data));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 83cd86fee89..8518b6606ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -608,8 +608,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
             ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "not 
supported now.");
         }
         if (command.isLocal()) {
-            String jobInfoString = new String(command.getJobInfo());
-            jobInfo = BackupJobInfo.genFromJson(jobInfoString);
+            jobInfo = command.getJobInfo();
 
             if (jobInfo.extraInfo == null) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, 
"Invalid job extra info empty");
@@ -646,13 +645,7 @@ public class BackupHandler extends MasterDaemon implements 
Writable {
                 metaVersion = jobInfo.metaVersion;
             }
 
-            BackupMeta backupMeta;
-            try {
-                backupMeta = BackupMeta.fromBytes(command.getMeta(), 
metaVersion);
-            } catch (IOException e) {
-                LOG.warn("read backup meta failed, current meta version {}", 
Env.getCurrentEnvJournalVersion(), e);
-                throw new DdlException("read backup meta failed", e);
-            }
+            BackupMeta backupMeta = command.getMeta();
             String backupTimestamp = TimeUtils.longToTimeString(
                     jobInfo.getBackupTime(), 
TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
             restoreJob = new RestoreJob(command.getLabel(), backupTimestamp,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index a7bf13d0a3c..1ea7d6ee6a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -1163,20 +1163,12 @@ public class BackupJob extends AbstractJob implements 
GsonPostProcessable {
         // Avoid loading expired meta.
         long expiredAt = createTime + timeoutMs;
         if (System.currentTimeMillis() >= expiredAt) {
-            return new Snapshot(label, new byte[0], new byte[0], expiredAt, 
commitSeq);
+            return new Snapshot(label, null, null, expiredAt, commitSeq);
         }
 
-        try {
-            File metaInfoFile = new File(localMetaInfoFilePath);
-            File jobInfoFile = new File(localJobInfoFilePath);
-            byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
-            byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
-            return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, 
commitSeq);
-        } catch (IOException e) {
-            LOG.warn("failed to load meta info and job info file, meta info 
file {}, job info file {}: ",
-                    localMetaInfoFilePath, localJobInfoFilePath, e);
-            return null;
-        }
+        File metaInfoFile = new File(localMetaInfoFilePath);
+        File jobInfoFile = new File(localJobInfoFilePath);
+        return new Snapshot(label, metaInfoFile, jobInfoFile, expiredAt, 
commitSeq);
     }
 
     public synchronized List<String> getInfo() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index ccf4941538a..643aead0446 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -53,6 +53,8 @@ import org.apache.logging.log4j.Logger;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -827,6 +829,12 @@ public class BackupJobInfo implements GsonPostProcessable {
         return jobInfo;
     }
 
+    public static BackupJobInfo fromInputStream(InputStream inputStream) 
throws IOException {
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            return GsonUtils.GSON.fromJson(reader, BackupJobInfo.class);
+        }
+    }
+
     public void writeToFile(File jobInfoFile) throws FileNotFoundException {
         PrintWriter printWriter = new PrintWriter(jobInfoFile);
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index 41b48af6bcb..7d34e0ce5d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -101,7 +101,7 @@ public class BackupMeta implements Writable, 
GsonPostProcessable {
         return fromInputStream(new ByteArrayInputStream(bytes), metaVersion);
     }
 
-    protected static BackupMeta fromInputStream(InputStream stream, int 
metaVersion) throws IOException {
+    public static BackupMeta fromInputStream(InputStream stream, int 
metaVersion) throws IOException {
         MetaContext metaContext = new MetaContext();
         metaContext.setMetaVersion(metaVersion);
         metaContext.setThreadLocalInfo();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
index a9f734dbc99..2fc3ca6d146 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
@@ -17,19 +17,22 @@
 
 package org.apache.doris.backup;
 
-import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.common.GZIPUtils;
+import org.apache.doris.common.Pair;
 
 import com.google.gson.annotations.SerializedName;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
 public class Snapshot {
     @SerializedName(value = "label")
     private String label = null;
 
-    @SerializedName(value = "meta")
-    private byte[] meta = null;
+    private File meta = null;
 
-    @SerializedName(value = "jobInfo")
-    private byte[] jobInfo = null;
+    private File jobInfo = null;
 
     @SerializedName(value = "expired_at")
     private long expiredAt = 0;
@@ -40,7 +43,7 @@ public class Snapshot {
     public Snapshot() {
     }
 
-    public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, 
long commitSeq) {
+    public Snapshot(String label, File meta, File jobInfo, long expiredAt, 
long commitSeq) {
         this.label = label;
         this.meta = meta;
         this.jobInfo = jobInfo;
@@ -48,12 +51,45 @@ public class Snapshot {
         this.commitSeq = commitSeq;
     }
 
-    public byte[] getMeta() {
-        return meta;
+    public static Pair<BackupMeta, BackupJobInfo> readFromBytes(byte[] meta, 
byte[] jobInfo) throws IOException {
+        BackupJobInfo backupJobInfo = BackupJobInfo.genFromJson(new 
String(jobInfo));
+        BackupMeta backupMeta = BackupMeta.fromBytes(meta, 
backupJobInfo.metaVersion);
+        return Pair.of(backupMeta, backupJobInfo);
+    }
+
+    public static Pair<BackupMeta, BackupJobInfo> 
readFromCompressedBytes(byte[] meta, byte[] jobInfo)
+            throws IOException {
+        BackupJobInfo backupJobInfo = 
BackupJobInfo.fromInputStream(GZIPUtils.lazyDecompress(jobInfo));
+        BackupMeta backupMeta = 
BackupMeta.fromInputStream(GZIPUtils.lazyDecompress(meta), 
backupJobInfo.metaVersion);
+        return Pair.of(backupMeta, backupJobInfo);
+    }
+
+    public static boolean isCompressed(byte[] meta, byte[] jobInfo) {
+        return GZIPUtils.isGZIPCompressed(jobInfo) || 
GZIPUtils.isGZIPCompressed(meta);
+    }
+
+    public long getMetaSize() {
+        return meta != null ? meta.length() : 0;
+    }
+
+    public long getJobInfoSize() {
+        return jobInfo != null ? jobInfo.length() : 0;
     }
 
-    public byte[] getJobInfo() {
-        return jobInfo;
+    public byte[] getCompressedMeta() throws IOException {
+        return GZIPUtils.compress(meta);
+    }
+
+    public byte[] getCompressedJobInfo() throws IOException {
+        return GZIPUtils.compress(jobInfo);
+    }
+
+    public byte[] getMeta() throws IOException {
+        return Files.readAllBytes(meta.toPath());
+    }
+
+    public byte[] getJobInfo() throws IOException {
+        return Files.readAllBytes(jobInfo.toPath());
     }
 
     public long getExpiredAt() {
@@ -68,16 +104,10 @@ public class Snapshot {
         return commitSeq;
     }
 
-    public String toJson() {
-        return GsonUtils.GSON.toJson(this);
-    }
-
     @Override
     public String toString() {
         return "Snapshot{"
                 + "label='" + label + '\''
-                + ", meta=" + meta
-                + ", jobInfo=" + jobInfo
                 + ", expiredAt=" + expiredAt
                 + ", commitSeq=" + commitSeq
                 + '}';
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
index c76368f4e64..673e8f1700b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RestoreCommand.java
@@ -18,6 +18,8 @@
 package org.apache.doris.nereids.trees.plans.commands;
 
 import org.apache.doris.analysis.StmtType;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
 import org.apache.doris.backup.Repository;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
@@ -94,8 +96,8 @@ public class RestoreCommand extends Command implements 
ForwardWithSync {
     private final Map<String, String> properties;
     private final boolean isExclude;
     private long timeoutMs;
-    private byte[] meta = null;
-    private byte[] jobInfo = null;
+    private BackupMeta meta = null;
+    private BackupJobInfo jobInfo = null;
     private String storageVaultName = null;
 
     /**
@@ -410,19 +412,19 @@ public class RestoreCommand extends Command implements 
ForwardWithSync {
         return isLocal;
     }
 
-    public byte[] getMeta() {
+    public BackupMeta getMeta() {
         return meta;
     }
 
-    public void setMeta(byte[] meta) {
+    public void setMeta(BackupMeta meta) {
         this.meta = meta;
     }
 
-    public byte[] getJobInfo() {
+    public BackupJobInfo getJobInfo() {
         return jobInfo;
     }
 
-    public void setJobInfo(byte[] jobInfo) {
+    public void setJobInfo(BackupJobInfo jobInfo) {
         this.jobInfo = jobInfo;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 70234ae52ba..83c53c532dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -29,6 +29,8 @@ import org.apache.doris.analysis.TableSample;
 import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.backup.BackupJobInfo;
+import org.apache.doris.backup.BackupMeta;
 import org.apache.doris.backup.Snapshot;
 import org.apache.doris.binlog.BinlogLagInfo;
 import org.apache.doris.catalog.AutoIncrementGenerator;
@@ -62,7 +64,6 @@ import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.GZIPUtils;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
@@ -3125,24 +3126,38 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED);
             result.getStatus().addToErrorMsgs(String.format("snapshot %s is 
expired", label));
         } else {
-            byte[] meta = snapshot.getMeta();
-            byte[] jobInfo = snapshot.getJobInfo();
+            long metaSize = snapshot.getMetaSize();
+            long jobInfoSize = snapshot.getJobInfoSize();
+            long snapshotSize = snapshot.getMetaSize() + 
snapshot.getJobInfoSize();
+            if (metaSize + jobInfoSize >= Integer.MAX_VALUE && 
!request.isEnableCompress()) {
+                String msg = String.format(
+                        "Snapshot %s is too large (%d bytes > 2GB). Please 
enable compression to continue.",
+                        label, snapshotSize);
+                LOG.warn("get snapshot failed: {}", msg);
+                result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
+                result.getStatus().addToErrorMsgs(msg);
+                return result;
+            }
+
             long expiredAt = snapshot.getExpiredAt();
             long commitSeq = snapshot.getCommitSeq();
 
             LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info 
size: {}, "
-                    + "expired at: {}, commit seq: {}", label, meta.length, 
jobInfo.length, expiredAt, commitSeq);
+                    + "expired at: {}, commit seq: {}", label, metaSize, 
jobInfoSize, expiredAt, commitSeq);
             if (request.isEnableCompress()) {
-                meta = GZIPUtils.compress(meta);
-                jobInfo = GZIPUtils.compress(jobInfo);
+                byte[] meta = snapshot.getCompressedMeta();
+                byte[] jobInfo = snapshot.getCompressedJobInfo();
+                result.setMeta(meta);
+                result.setJobInfo(jobInfo);
                 result.setCompressed(true);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("get snapshot info with compress, snapshot: {}, 
compressed meta "
                             + "size {}, compressed job info size {}", label, 
meta.length, jobInfo.length);
                 }
+            } else {
+                result.setMeta(snapshot.getMeta());
+                result.setJobInfo(snapshot.getJobInfo());
             }
-            result.setMeta(meta);
-            result.setJobInfo(jobInfo);
             result.setExpiredAt(expiredAt);
             result.setCommitSeq(commitSeq);
         }
@@ -3255,6 +3270,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             }
         }
 
+        BackupMeta backupMeta;
+        BackupJobInfo backupJobInfo;
         byte[] meta = request.getMeta();
         byte[] jobInfo = request.getJobInfo();
         if (Config.enable_restore_snapshot_rpc_compression && 
request.isCompressed()) {
@@ -3263,15 +3280,25 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                         meta.length, jobInfo.length);
             }
             try {
-                meta = GZIPUtils.decompress(meta);
-                jobInfo = GZIPUtils.decompress(jobInfo);
+                Pair<BackupMeta, BackupJobInfo> pair = 
Snapshot.readFromCompressedBytes(meta, jobInfo);
+                backupMeta = pair.first;
+                backupJobInfo = pair.second;
             } catch (Exception e) {
                 LOG.warn("decompress meta and job info failed", e);
                 throw new UserException("decompress meta and job info failed", 
e);
             }
-        } else if (GZIPUtils.isGZIPCompressed(jobInfo) || 
GZIPUtils.isGZIPCompressed(meta)) {
+        } else if (Snapshot.isCompressed(meta, jobInfo)) {
             throw new UserException("The request is compressed, but the config 
"
                     + "`enable_restore_snapshot_rpc_compressed` is not 
enabled.");
+        } else {
+            try {
+                Pair<BackupMeta, BackupJobInfo> pair = 
Snapshot.readFromBytes(meta, jobInfo);
+                backupMeta = pair.first;
+                backupJobInfo = pair.second;
+            } catch (Exception e) {
+                LOG.warn("deserialize meta and job info failed", e);
+                throw new UserException("deserialize meta and job info 
failed", e);
+            }
         }
 
         //instantiate RestoreCommand
@@ -3331,8 +3358,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
 
         RestoreCommand restoreCommand = new RestoreCommand(labelNameInfo, 
repoName, tableRefInfos, properties, false);
-        restoreCommand.setMeta(meta);
-        restoreCommand.setJobInfo(jobInfo);
+        restoreCommand.setMeta(backupMeta);
+        restoreCommand.setJobInfo(backupJobInfo);
         restoreCommand.setIsBeingSynced();
         LOG.debug("restore snapshot info, restoreCommand: {}", restoreCommand);
         try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to