This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 37253328bea [feat](http) Add sync and export cloud meta API (#60739)
37253328bea is described below

commit 37253328bea712ae387dff140606eccd4c551ab0
Author: walter <[email protected]>
AuthorDate: Sat Feb 28 11:03:31 2026 +0800

    [feat](http) Add sync and export cloud meta API (#60739)
---
 cloud/src/common/bvars.cpp                         |   5 +
 cloud/src/common/bvars.h                           |   3 +
 cloud/src/meta-service/meta_service.h              |  11 +
 cloud/src/meta-service/meta_service_txn.cpp        |  56 ++++
 cloud/test/meta_service_test.cpp                   |  17 ++
 .../doris/cloud/persist/CloudMetaSyncPoint.java    |  69 +++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |   5 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |   5 +
 .../httpv2/rest/manager/MetaBackupAction.java      | 329 +++++++++++++++++++++
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../java/org/apache/doris/persist/EditLog.java     |  10 +
 .../org/apache/doris/persist/OperationType.java    |   1 +
 gensrc/proto/cloud.proto                           |  12 +
 13 files changed, 529 insertions(+)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 91b1e0bb649..f2dd86d8d1c 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -39,6 +39,7 @@ BvarLatencyRecorderWithTag 
g_bvar_ms_commit_txn_eventually("ms", "commit_txn_eve
 BvarLatencyRecorderWithTag g_bvar_ms_abort_txn("ms", "abort_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn("ms", "get_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id("ms", 
"get_current_max_txn_id");
+BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point("ms", 
"create_meta_sync_point");
 BvarLatencyRecorderWithTag g_bvar_ms_begin_sub_txn("ms", "begin_sub_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_abort_sub_txn("ms", "abort_sub_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict("ms", 
"check_txn_conflict");
@@ -467,6 +468,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_abort_txn_del_counter("rpc_kv_abort_txn_del_counte
 mBvarInt64Adder 
g_bvar_rpc_kv_get_txn_get_counter("rpc_kv_get_txn_get_counter",{"instance_id"});
 // get_current_max_txn_id
 mBvarInt64Adder 
g_bvar_rpc_kv_get_current_max_txn_id_get_counter("rpc_kv_get_current_max_txn_id_get_counter",{"instance_id"});
+// create_meta_sync_point
+mBvarInt64Adder 
g_bvar_rpc_kv_create_meta_sync_point_del_counter("rpc_kv_create_meta_sync_point_del_counter",{"instance_id"});
 // begin_sub_txn
 mBvarInt64Adder 
g_bvar_rpc_kv_begin_sub_txn_get_counter("rpc_kv_begin_sub_txn_get_counter",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_begin_sub_txn_put_counter("rpc_kv_begin_sub_txn_put_counter",{"instance_id"});
@@ -669,6 +672,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_abort_txn_del_bytes("rpc_kv_abort_txn_del_bytes",{
 mBvarInt64Adder 
g_bvar_rpc_kv_get_txn_get_bytes("rpc_kv_get_txn_get_bytes",{"instance_id"});
 // get_current_max_txn_id
 mBvarInt64Adder 
g_bvar_rpc_kv_get_current_max_txn_id_get_bytes("rpc_kv_get_current_max_txn_id_get_bytes",{"instance_id"});
+// create_meta_sync_point
+mBvarInt64Adder 
g_bvar_rpc_kv_create_meta_sync_point_del_bytes("rpc_kv_create_meta_sync_point_del_bytes",{"instance_id"});
 // begin_sub_txn
 mBvarInt64Adder 
g_bvar_rpc_kv_begin_sub_txn_get_bytes("rpc_kv_begin_sub_txn_get_bytes",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_begin_sub_txn_put_bytes("rpc_kv_begin_sub_txn_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index e4b9789c1bf..695a9c0206b 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -551,6 +551,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_commit_txn_eventually;
 extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_current_max_txn_id;
+extern BvarLatencyRecorderWithTag g_bvar_ms_create_meta_sync_point;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_txn_conflict;
 extern BvarLatencyRecorderWithTag g_bvar_ms_abort_txn_with_coordinator;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_prepare_txn_by_coordinator;
@@ -887,6 +888,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_counter;
@@ -1026,6 +1028,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_abort_txn_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_current_max_txn_id_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_create_meta_sync_point_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_begin_sub_txn_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index f26dc3ad360..2e4e2790f4b 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -125,6 +125,10 @@ public:
                                 const GetCurrentMaxTxnRequest* request,
                                 GetCurrentMaxTxnResponse* response,
                                 ::google::protobuf::Closure* done) override;
+    void create_meta_sync_point(::google::protobuf::RpcController* controller,
+                                const CreateMetaSyncPointRequest* request,
+                                CreateMetaSyncPointResponse* response,
+                                ::google::protobuf::Closure* done) override;
 
     void begin_sub_txn(::google::protobuf::RpcController* controller,
                        const BeginSubTxnRequest* request, BeginSubTxnResponse* 
response,
@@ -570,6 +574,13 @@ public:
         call_impl(&cloud::MetaService::get_current_max_txn_id, controller, 
request, response, done);
     }
 
+    void create_meta_sync_point(::google::protobuf::RpcController* controller,
+                                const CreateMetaSyncPointRequest* request,
+                                CreateMetaSyncPointResponse* response,
+                                ::google::protobuf::Closure* done) override {
+        call_impl(&cloud::MetaService::create_meta_sync_point, controller, 
request, response, done);
+    }
+
     void begin_sub_txn(::google::protobuf::RpcController* controller,
                        const BeginSubTxnRequest* request, BeginSubTxnResponse* 
response,
                        ::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index e707883105b..cd7b62e7e6d 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -46,6 +46,8 @@ using namespace std::chrono;
 
 namespace doris::cloud {
 
+static constexpr std::string_view kMetaSyncPointDummyKey = 
"__meta_service_sync_point_dummy_key__";
+
 struct TableStats {
     int64_t updated_row_count = 0;
 
@@ -3628,6 +3630,60 @@ void 
MetaServiceImpl::get_current_max_txn_id(::google::protobuf::RpcController*
     response->set_current_max_txn_id(current_max_txn_id);
 }
 
+void 
MetaServiceImpl::create_meta_sync_point(::google::protobuf::RpcController* 
controller,
+                                             const CreateMetaSyncPointRequest* 
request,
+                                             CreateMetaSyncPointResponse* 
response,
+                                             ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(create_meta_sync_point, del);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(create_meta_sync_point)
+
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        msg = "failed to create txn";
+        code = cast_as<ErrCategory::CREATE>(err);
+        return;
+    }
+
+    txn->enable_get_versionstamp();
+    txn->remove(kMetaSyncPointDummyKey);
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "txn->commit() failed, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    int64_t committed_version = 0;
+    err = txn->get_committed_version(&committed_version);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "get committed version failed, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    Versionstamp versionstamp;
+    err = txn->get_versionstamp(&versionstamp);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "get versionstamp failed, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    response->set_committed_version(committed_version);
+    response->set_versionstamp(versionstamp.to_string());
+}
+
 /**
  * 1. Generate a sub_txn_id
  *
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index f2f623cf1c1..f815c0d3244 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -2671,6 +2671,23 @@ TEST(MetaServiceTest, GetCurrentMaxTxnIdTest) {
     ASSERT_GE(max_txn_id_res.current_max_txn_id(), begin_txn_res.txn_id());
 }
 
+TEST(MetaServiceTest, CreateMetaSyncPointTest) {
+    auto meta_service = get_meta_service();
+    const std::string cloud_unique_id = "test_cloud_unique_id";
+
+    brpc::Controller cntl;
+    CreateMetaSyncPointRequest req;
+    CreateMetaSyncPointResponse resp;
+    req.set_cloud_unique_id(cloud_unique_id);
+
+    meta_service->create_meta_sync_point(
+            reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, 
&resp, nullptr);
+
+    ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
+    ASSERT_GT(resp.committed_version(), 0);
+    ASSERT_EQ(resp.versionstamp().size(), 20);
+}
+
 TEST(MetaServiceTest, AbortTxnWithCoordinatorTest) {
     auto meta_service = get_meta_service();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java
new file mode 100644
index 00000000000..e226bad144d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/persist/CloudMetaSyncPoint.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class CloudMetaSyncPoint implements Writable {
+    @SerializedName(value = "committedVersion")
+    private long committedVersion;
+
+    @SerializedName(value = "versionStamp")
+    private String versionStamp;
+
+    @SerializedName(value = "createTimeMs")
+    private long createTimeMs;
+
+    public CloudMetaSyncPoint() {
+    }
+
+    public CloudMetaSyncPoint(long committedVersion, String versionStamp, long 
createTimeMs) {
+        this.committedVersion = committedVersion;
+        this.versionStamp = versionStamp;
+        this.createTimeMs = createTimeMs;
+    }
+
+    public long getCommittedVersion() {
+        return committedVersion;
+    }
+
+    public String getVersionStamp() {
+        return versionStamp;
+    }
+
+    public long getCreateTimeMs() {
+        return createTimeMs;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static CloudMetaSyncPoint read(DataInput in) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), 
CloudMetaSyncPoint.class);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index a27a823c7c1..2f85ef17a8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -221,6 +221,11 @@ public class MetaServiceClient {
                 .getCurrentMaxTxnId(request);
     }
 
+    public Cloud.CreateMetaSyncPointResponse 
createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request) {
+        return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                .createMetaSyncPoint(request);
+    }
+
     public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest 
request) {
         if (!request.hasCloudUniqueId()) {
             Cloud.BeginSubTxnRequest.Builder builder = 
Cloud.BeginSubTxnRequest.newBuilder();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index e1cb45401db..90e3a9276cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -387,6 +387,11 @@ public class MetaServiceProxy {
         return executeWithMetrics("getCurrentMaxTxnId", (client) -> 
client.getCurrentMaxTxnId(request));
     }
 
+    public Cloud.CreateMetaSyncPointResponse 
createMetaSyncPoint(Cloud.CreateMetaSyncPointRequest request)
+            throws RpcException {
+        return executeWithMetrics("createMetaSyncPoint", (client) -> 
client.createMetaSyncPoint(request));
+    }
+
     public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest 
request)
             throws RpcException {
         return executeWithMetrics("beginSubTxn", (client) -> 
client.beginSubTxn(request));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java
new file mode 100644
index 00000000000..71a053e25fc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/MetaBackupAction.java
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest.manager;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.persist.CloudMetaSyncPoint;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.rest.RestBaseController;
+import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.bdbje.BDBJEJournal;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.Storage;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.util.DbBackup;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import org.apache.commons.io.FileUtils;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/rest/v2/manager/backup")
+public class MetaBackupAction extends RestBaseController {
+    private static final String ALLOW_REDIRECT = "allow_redirect";
+
+    @PostMapping("/sync_cloud_meta")
+    public Object syncCloudMeta(HttpServletRequest request, 
HttpServletResponse response) {
+        if (!Config.isCloudMode()) {
+            return ResponseEntityBuilder.okWithCommonError("/sync_cloud_meta 
only works on the cloud mode");
+        }
+        try {
+            if (needRedirect(request.getScheme())) {
+                return redirectToHttps(request);
+            }
+            executeCheckPassword(request, response);
+            
checkGlobalAuth(org.apache.doris.qe.ConnectContext.get().getCurrentUserIdentity(),
 PrivPredicate.ADMIN);
+            Object redirectOrError = checkMasterAndRedirectIfNeeded(request, 
response);
+            if (redirectOrError != null) {
+                return redirectOrError;
+            }
+
+            synchronized (Env.getCurrentEnv().getEditLog()) {
+                MetaSyncPointVersion syncVersion = createMetaSyncPoint();
+                CloudMetaSyncPoint syncPoint = new 
CloudMetaSyncPoint(syncVersion.committedVersion,
+                        syncVersion.versionStamp,
+                        System.currentTimeMillis());
+                long journalId = 
Env.getCurrentEnv().getEditLog().logMetaSyncPoint(syncPoint);
+
+                Map<String, Object> data = new HashMap<>();
+                data.put("journal_id", journalId);
+                data.put("committed_version", syncVersion.committedVersion);
+                data.put("versionstamp", syncVersion.versionStamp);
+                return ResponseEntityBuilder.ok(data);
+            }
+        } catch (Exception e) {
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+        }
+    }
+
+    @PostMapping("/export_meta")
+    public Object exportMeta(@RequestBody ExportMetaRequest req,
+            HttpServletRequest request, HttpServletResponse response) {
+        if (!Config.isCloudMode()) {
+            return ResponseEntityBuilder.okWithCommonError("/export_meta only 
works on the cloud mode");
+        }
+        try {
+            if (needRedirect(request.getScheme())) {
+                return redirectToHttps(request);
+            }
+            executeCheckPassword(request, response);
+            
checkGlobalAuth(org.apache.doris.qe.ConnectContext.get().getCurrentUserIdentity(),
 PrivPredicate.ADMIN);
+            Object redirectOrError = checkMasterAndRedirectIfNeeded(request, 
response);
+            if (redirectOrError != null) {
+                return redirectOrError;
+            }
+
+            if (req == null || Strings.isNullOrEmpty(req.getTargetDir())) {
+                return ResponseEntityBuilder.badRequest("target_dir is 
required");
+            }
+            File targetDir = prepareTargetDir(req.getTargetDir());
+            if (Env.getCurrentEnv().getCheckpointer() != null) {
+                
Env.getCurrentEnv().getCheckpointer().getLock().readLock().lock();
+            }
+            try {
+                CopiedImage copiedImage = copyLatestImageIfExists(targetDir);
+                copyImageMetaFiles(targetDir);
+                BdbExportResult bdbResult = exportBdbJe(targetDir, 
copiedImage.version, copiedImage.exists);
+
+                Map<String, Object> data = new HashMap<>();
+                data.put("target_dir", targetDir.getAbsolutePath());
+                data.put("bdb_dir", new File(targetDir, 
"bdb").getAbsolutePath());
+                data.put("bdb_file_count", bdbResult.fileCount);
+                data.put("image_file", copiedImage.exists ? 
copiedImage.file.getName() : null);
+                data.put("image_version", copiedImage.version);
+                data.put("image_exported", copiedImage.exists);
+                data.put("journal_upper_bound", bdbResult.journalUpperBound);
+                return ResponseEntityBuilder.ok(data);
+            } finally {
+                if (Env.getCurrentEnv().getCheckpointer() != null) {
+                    
Env.getCurrentEnv().getCheckpointer().getLock().readLock().unlock();
+                }
+            }
+        } catch (Exception e) {
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+        }
+    }
+
+    private Object checkMasterAndRedirectIfNeeded(HttpServletRequest request, 
HttpServletResponse response)
+            throws Exception {
+        if (Env.getCurrentEnv().isMaster()) {
+            return null;
+        }
+        if (Boolean.parseBoolean(request.getParameter(ALLOW_REDIRECT))) {
+            return redirectToMasterOrException(request, response);
+        }
+        return ResponseEntityBuilder.okWithCommonError(
+                "current fe is not master, master is "
+                        + Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort());
+    }
+
+    private MetaSyncPointVersion createMetaSyncPoint() throws DdlException {
+        Cloud.CreateMetaSyncPointRequest req = 
Cloud.CreateMetaSyncPointRequest.newBuilder()
+                .setCloudUniqueId(Config.cloud_unique_id)
+                .setRequestIp(FrontendOptions.getLocalHostAddressCached())
+                .build();
+        try {
+            Cloud.CreateMetaSyncPointResponse resp = 
MetaServiceProxy.getInstance().createMetaSyncPoint(req);
+            if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                throw new DdlException("create_meta_sync_point failed: " + 
resp.getStatus().getMsg());
+            }
+            if (!resp.hasCommittedVersion()) {
+                throw new DdlException("meta service response missing 
committed_version");
+            }
+            if (!resp.hasVersionstamp() || 
Strings.isNullOrEmpty(resp.getVersionstamp())) {
+                throw new DdlException("meta service response missing 
versionstamp");
+            }
+            return new MetaSyncPointVersion(resp.getCommittedVersion(), 
resp.getVersionstamp());
+        } catch (RpcException e) {
+            throw new DdlException("create_meta_sync_point rpc failed: " + 
e.getMessage());
+        }
+    }
+
+    private static class MetaSyncPointVersion {
+        private final long committedVersion;
+        private final String versionStamp;
+
+        MetaSyncPointVersion(long committedVersion, String versionStamp) {
+            this.committedVersion = committedVersion;
+            this.versionStamp = versionStamp;
+        }
+    }
+
+    private static File prepareTargetDir(String targetDir) throws IOException {
+        File dir = new File(targetDir).getCanonicalFile();
+        if (dir.exists()) {
+            if (!dir.isDirectory()) {
+                throw new IOException("target_dir exists but is not a 
directory: " + dir.getAbsolutePath());
+            }
+            FileUtils.cleanDirectory(dir);
+        } else {
+            FileUtils.forceMkdir(dir);
+        }
+        return dir;
+    }
+
+    private BdbExportResult exportBdbJe(File targetDir, long imageVersion, 
boolean hasImage) throws Exception {
+        if (!"bdb".equalsIgnoreCase(Config.edit_log_type)) {
+            throw new DdlException("only bdb edit_log_type supports bdbje 
export");
+        }
+        Journal journal = Env.getCurrentEnv().getEditLog().getJournal();
+        if (!(journal instanceof BDBJEJournal)) {
+            throw new DdlException("current edit log is not BDBJEJournal");
+        }
+
+        BDBJEJournal bdbjeJournal = (BDBJEJournal) journal;
+        if (bdbjeJournal.getBDBEnvironment() == null) {
+            throw new DdlException("bdb environment is not initialized");
+        }
+        ReplicatedEnvironment replicatedEnvironment = 
bdbjeJournal.getBDBEnvironment().getReplicatedEnvironment();
+        if (replicatedEnvironment == null) {
+            throw new DdlException("bdb replicated environment is not ready");
+        }
+
+        File bdbTargetDir = new File(targetDir, "bdb");
+        FileUtils.forceMkdir(bdbTargetDir);
+        File bdbSourceDir = new File(Env.getCurrentEnv().getBdbDir());
+
+        DbBackup backup = new DbBackup(replicatedEnvironment);
+        backup.startBackup();
+        try {
+            long journalUpperBound = bdbjeJournal.getMaxJournalId();
+            if (hasImage) {
+                long journalMinId = bdbjeJournal.getMinJournalId();
+                if (journalMinId > 0 && journalMinId > imageVersion + 1) {
+                    throw new DdlException("export failed: bdb min journal id 
" + journalMinId
+                            + " is greater than image_version + 1 (" + 
(imageVersion + 1) + ")");
+                }
+                if (journalUpperBound < imageVersion) {
+                    throw new DdlException("export failed: bdb journal upper 
bound " + journalUpperBound
+                            + " is smaller than image_version " + 
imageVersion);
+                }
+            }
+            String[] files = backup.getLogFilesInBackupSet();
+            for (String fileName : files) {
+                FileUtils.copyFile(new File(bdbSourceDir, fileName), new 
File(bdbTargetDir, fileName));
+            }
+            return new BdbExportResult(files.length, journalUpperBound);
+        } finally {
+            backup.endBackup();
+        }
+    }
+
+    private CopiedImage copyLatestImageIfExists(File targetDir) throws 
IOException {
+        File imageTargetDir = new File(targetDir, "image");
+        Storage storage = new Storage(Env.getServingEnv().getImageDir());
+        long imageVersion = storage.getLatestImageSeq();
+        File image = storage.getImageFile(imageVersion);
+        if (!image.exists()) {
+            return CopiedImage.notFound(imageVersion);
+        }
+        File targetImage = new File(imageTargetDir, image.getName());
+        linkOrCopyFile(image, targetImage);
+        return CopiedImage.found(targetImage, imageVersion);
+    }
+
+    private void copyImageMetaFiles(File targetDir) throws IOException {
+        File imageTargetDir = new File(targetDir, "image");
+        Storage storage = new Storage(Env.getServingEnv().getImageDir());
+        File[] metaFiles = new File[] {
+                storage.getModeFile(),
+                storage.getRoleFile(),
+                storage.getVersionFile()
+        };
+        for (File source : metaFiles) {
+            if (!source.exists()) {
+                continue;
+            }
+            linkOrCopyFile(source, new File(imageTargetDir, source.getName()));
+        }
+    }
+
+    private void linkOrCopyFile(File source, File target) throws IOException {
+        try {
+            Files.createLink(target.toPath(), source.toPath());
+        } catch (UnsupportedOperationException | SecurityException | 
FileAlreadyExistsException e) {
+            FileUtils.copyFile(source, target);
+        } catch (IOException e) {
+            FileUtils.copyFile(source, target);
+        }
+    }
+
+    private static class CopiedImage {
+        private final File file;
+        private final long version;
+        private final boolean exists;
+
+        CopiedImage(File file, long version, boolean exists) {
+            this.file = file;
+            this.version = version;
+            this.exists = exists;
+        }
+
+        static CopiedImage found(File file, long version) {
+            return new CopiedImage(file, version, true);
+        }
+
+        static CopiedImage notFound(long version) {
+            return new CopiedImage(null, version, false);
+        }
+    }
+
+    private static class BdbExportResult {
+        private final int fileCount;
+        private final long journalUpperBound;
+
+        BdbExportResult(int fileCount, long journalUpperBound) {
+            this.fileCount = fileCount;
+            this.journalUpperBound = journalUpperBound;
+        }
+    }
+
+    public static class ExportMetaRequest {
+        @JsonAlias({"targetDir"})
+        @JsonProperty("target_dir")
+        private String targetDir;
+
+        public String getTargetDir() {
+            return targetDir;
+        }
+
+        public void setTargetDir(String targetDir) {
+            this.targetDir = targetDir;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 76b4578f892..8f5e3f17865 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.FunctionSearchDesc;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.cloud.CloudWarmUpJob;
+import org.apache.doris.cloud.persist.CloudMetaSyncPoint;
 import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
 import org.apache.doris.cloud.snapshot.SnapshotState;
 import org.apache.doris.cluster.Cluster;
@@ -990,6 +991,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_META_SYNC_POINT: {
+                data = CloudMetaSyncPoint.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index d506b474ed7..c7384c7e715 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.FunctionSearchDesc;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.cloud.CloudWarmUpJob;
 import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.persist.CloudMetaSyncPoint;
 import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
 import org.apache.doris.cloud.snapshot.SnapshotState;
 import org.apache.doris.common.Config;
@@ -1424,6 +1425,11 @@ public class EditLog {
                     // TODO: implement
                     break;
                 }
+                case OperationType.OP_META_SYNC_POINT: {
+                    // CloudMetaSyncPoint info = (CloudMetaSyncPoint) 
journal.getData();
+                    // This log is only used to keep FE/MS cut point in 
journal timeline.
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}, log id: {}", opCode, 
logId, e);
@@ -2537,4 +2543,8 @@ public class EditLog {
     public long logBeginSnapshot(SnapshotState snapshotState) {
         return logEdit(OperationType.OP_BEGIN_SNAPSHOT, snapshotState);
     }
+
+    public long logMetaSyncPoint(CloudMetaSyncPoint syncPoint) {
+        return logEdit(OperationType.OP_META_SYNC_POINT, syncPoint);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 1174d9c3874..016903129c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -426,6 +426,7 @@ public class OperationType {
     public static final short OP_MODIFY_CLOUD_WARM_UP_JOB = 1002;
 
     public static final short OP_BEGIN_SNAPSHOT = 1100;
+    public static final short OP_META_SYNC_POINT = 1101;
 
     /**
      * Get opcode name by op code.
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 6c14b3b1ee4..1f61fcab17d 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1113,6 +1113,17 @@ message GetCurrentMaxTxnResponse {
     optional int64 current_max_txn_id = 2;
 }
 
+message CreateMetaSyncPointRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional string request_ip = 2;
+}
+
+message CreateMetaSyncPointResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional int64 committed_version = 2;
+    optional string versionstamp = 3;
+}
+
 message AbortTxnWithCoordinatorRequest {
     optional string cloud_unique_id = 1; // For auth
     optional string ip = 2;
@@ -2234,6 +2245,7 @@ service MetaService {
     rpc abort_txn(AbortTxnRequest) returns (AbortTxnResponse);
     rpc get_txn(GetTxnRequest) returns (GetTxnResponse);
     rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns 
(GetCurrentMaxTxnResponse);
+    rpc create_meta_sync_point(CreateMetaSyncPointRequest) returns 
(CreateMetaSyncPointResponse);
     rpc check_txn_conflict(CheckTxnConflictRequest) returns 
(CheckTxnConflictResponse);
     rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
     rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to