This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 677de342856 [feat](restore) Support compressed snapshot meta and job
info (#43516) (#43568)
677de342856 is described below
commit 677de3428564f00a0a7b5b338d7b8cd1c97f5bf6
Author: walter <[email protected]>
AuthorDate: Mon Nov 11 17:03:21 2024 +0800
[feat](restore) Support compressed snapshot meta and job info (#43516)
(#43568)
Cherry-pick #43516
---
fe/fe-common/pom.xml | 4 ++
.../main/java/org/apache/doris/common/Config.java | 9 ++++
.../java/org/apache/doris/common/GZIPUtils.java | 48 ++++++++++++++++++++++
.../apache/doris/service/FrontendServiceImpl.java | 48 ++++++++++++++++++----
gensrc/thrift/FrontendService.thrift | 3 ++
5 files changed, 105 insertions(+), 7 deletions(-)
diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml
index 9ca13a62857..0f6fca30048 100644
--- a/fe/fe-common/pom.xml
+++ b/fe/fe-common/pom.xml
@@ -89,6 +89,10 @@ under the License.
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
</dependencies>
<build>
<finalName>doris-fe-common</finalName>
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 954dde05005..7fbb4745ac7 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1506,6 +1506,15 @@ public class Config extends ConfigBase {
@ConfField(mutable = false)
public static boolean backup_job_compressed_serialization = false;
+ /**
+ * A internal config, to indicate whether to enable the restore snapshot
rpc compression.
+ *
+ * The ccr syncer will depends this config to decide whether to compress
the meta and job
+ * info of the restore snapshot request.
+ */
+ @ConfField(mutable = false)
+ public static boolean enable_restore_snapshot_rpc_compression = true;
+
/**
* Control the max num of tablets per backup job involved.
*/
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
new file mode 100644
index 00000000000..7408e2888cc
--- /dev/null
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class GZIPUtils {
+ public static boolean isGZIPCompressed(byte[] data) {
+ // From RFC 1952: 3.2. Members with a deflate compressed data stream
(ID1 = 8, ID2 = 8)
+ return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte)
0x8B;
+ }
+
+ public static byte[] compress(byte[] data) throws IOException {
+ ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
+ gzipStream.write(data);
+ }
+ return bytesStream.toByteArray();
+ }
+
+ public static byte[] decompress(byte[] data) throws IOException {
+ ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
+ try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
+ return IOUtils.toByteArray(gzipStream);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 25fa5e1524c..d83ff7e0815 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
@@ -208,6 +209,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
+import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@@ -2765,8 +2767,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// getSnapshotImpl
private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request,
String clientIp)
- throws UserException {
- // Step 1: Check all required arg: user, passwd, db, label_name,
snapshot_name, snapshot_type
+ throws UserException, IOException {
+ // Step 1: Check all required arg: user, passwd, db, label_name,
snapshot_name,
+ // snapshot_type
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
@@ -2811,10 +2814,22 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs(String.format("snapshot %s not
exist", label));
} else {
- result.setMeta(snapshot.getMeta());
- result.setJobInfo(snapshot.getJobInfo());
+ byte[] meta = snapshot.getMeta();
+ byte[] jobInfo = snapshot.getJobInfo();
+
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info
size: {}",
- label, snapshot.getMeta().length,
snapshot.getJobInfo().length);
+ label, meta.length, jobInfo.length);
+ if (request.isEnableCompress()) {
+ meta = GZIPUtils.compress(meta);
+ jobInfo = GZIPUtils.compress(jobInfo);
+ result.setCompressed(true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get snapshot info with compress, snapshot: {},
compressed meta "
+ + "size {}, compressed job info size {}", label,
meta.length, jobInfo.length);
+ }
+ }
+ result.setMeta(meta);
+ result.setJobInfo(jobInfo);
}
return result;
@@ -2928,8 +2943,27 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
restoreTableRefClause = new
AbstractBackupTableRefClause(isExclude, tableRefs);
}
}
- RestoreStmt restoreStmt = new RestoreStmt(label, repoName,
restoreTableRefClause, properties, request.getMeta(),
- request.getJobInfo());
+
+ byte[] meta = request.getMeta();
+ byte[] jobInfo = request.getJobInfo();
+ if (Config.enable_restore_snapshot_rpc_compression &&
request.isCompressed()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("decompress meta and job info, compressed meta size
{}, compressed job info size {}",
+ meta.length, jobInfo.length);
+ }
+ try {
+ meta = GZIPUtils.decompress(meta);
+ jobInfo = GZIPUtils.decompress(jobInfo);
+ } catch (Exception e) {
+ LOG.warn("decompress meta and job info failed", e);
+ throw new UserException("decompress meta and job info failed",
e);
+ }
+ } else if (GZIPUtils.isGZIPCompressed(jobInfo) ||
GZIPUtils.isGZIPCompressed(meta)) {
+ throw new UserException("The request is compressed, but the config
"
+ + "`enable_restore_snapshot_rpc_compressed` is not
enabled.");
+ }
+
+ RestoreStmt restoreStmt = new RestoreStmt(label, repoName,
restoreTableRefClause, properties, meta, jobInfo);
restoreStmt.setIsBeingSynced();
LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 0140eeff5e1..575643030e1 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1077,6 +1077,7 @@ struct TGetSnapshotRequest {
7: optional string label_name
8: optional string snapshot_name
9: optional TSnapshotType snapshot_type
+ 10: optional bool enable_compress;
}
struct TGetSnapshotResult {
@@ -1084,6 +1085,7 @@ struct TGetSnapshotResult {
2: optional binary meta
3: optional binary job_info
4: optional Types.TNetworkAddress master_address
+ 5: optional bool compressed;
}
struct TTableRef {
@@ -1107,6 +1109,7 @@ struct TRestoreSnapshotRequest {
13: optional bool clean_tables
14: optional bool clean_partitions
15: optional bool atomic_restore
+ 16: optional bool compressed;
}
struct TRestoreSnapshotResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]