This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 56aef960cc7 branch-4.0: [improve](job) support change offset for 
streaming job in cloud mode #57264 (#57434)
56aef960cc7 is described below

commit 56aef960cc7aa1d9f9eba980e5b7925ab1c121b4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 6 19:48:03 2025 +0800

    branch-4.0: [improve](job) support change offset for streaming job in cloud 
mode #57264 (#57434)
    
    Cherry-picked from #57264
    
    Co-authored-by: wudi <[email protected]>
---
 cloud/src/common/bvars.cpp                         |   9 ++
 cloud/src/common/bvars.h                           |   7 +
 cloud/src/meta-service/meta_service.h              |  13 ++
 cloud/src/meta-service/meta_service_txn.cpp        | 104 +++++++++++++
 cloud/test/meta_service_job_test.cpp               | 143 +++++++++++++++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |  12 ++
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |   5 +
 .../insert/streaming/StreamingInsertJob.java       |  39 ++++-
 gensrc/proto/cloud.proto                           |  13 ++
 ...st_streaming_job_alter_offset_restart_fe.groovy | 172 +++++++++++++++++++++
 10 files changed, 513 insertions(+), 4 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 4f8da0076cf..ac36976553b 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -91,6 +91,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", 
"get_instance");
 BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
 BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms", 
"get_streaming_task_commit_attach");
 BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job("ms", 
"delete_streaming_job");
+BvarLatencyRecorderWithTag g_bvar_ms_reset_streaming_job_offset("ms", 
"reset_streaming_job_offset");
 BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", 
"reset_rl_progress");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
@@ -386,6 +387,10 @@ mBvarInt64Adder 
g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_
 mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
 // delete_streaming_job
 mBvarInt64Adder 
g_bvar_rpc_kv_delete_streaming_job_del_counter("rpc_kv_delete_streaming_job_del_counter",{"instance_id"});
+// reset_streaming_job_offset
+mBvarInt64Adder 
g_bvar_rpc_kv_reset_streaming_job_offset_get_counter("rpc_kv_reset_streaming_job_offset_get_counter",{"instance_id"});
+mBvarInt64Adder 
g_bvar_rpc_kv_reset_streaming_job_offset_put_counter("rpc_kv_reset_streaming_job_offset_put_counter",{"instance_id"});
+mBvarInt64Adder 
g_bvar_rpc_kv_reset_streaming_job_offset_del_counter("rpc_kv_reset_streaming_job_offset_del_counter",{"instance_id"});
 // reset_rl_progress
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
@@ -578,6 +583,10 @@ mBvarInt64Adder 
g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl
 mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
 // delete_streaming_job
 mBvarInt64Adder 
g_bvar_rpc_kv_delete_streaming_job_del_bytes("rpc_kv_delete_streaming_job_del_bytes",{"instance_id"});
+// reset_streaming_job_offset
+mBvarInt64Adder 
g_bvar_rpc_kv_reset_streaming_job_offset_get_bytes("rpc_kv_reset_streaming_job_offset_get_bytes",{"instance_id"});
+mBvarInt64Adder 
g_bvar_rpc_kv_reset_streaming_job_offset_put_bytes("rpc_kv_reset_streaming_job_offset_put_bytes",{"instance_id"});
+mBvarInt64Adder 
g_bvar_rpc_kv_reset_streaming_job_offset_del_bytes("rpc_kv_reset_streaming_job_offset_del_bytes",{"instance_id"});
 // reset_rl_progress
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 0153248c499..5641a2a30ce 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -607,6 +607,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
 extern BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job;
+extern BvarLatencyRecorderWithTag g_bvar_ms_reset_streaming_job_offset;
 extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
@@ -838,6 +839,9 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
 extern mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
@@ -970,6 +974,9 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
 extern mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index f84fa396d86..0e5ca28aee6 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -340,6 +340,11 @@ public:
                               DeleteStreamingJobResponse* response,
                               ::google::protobuf::Closure* done) override;
 
+    void reset_streaming_job_offset(::google::protobuf::RpcController* 
controller,
+                                    const ResetStreamingJobOffsetRequest* 
request,
+                                    ResetStreamingJobOffsetResponse* response,
+                                    ::google::protobuf::Closure* done) 
override;
+
     void reset_rl_progress(::google::protobuf::RpcController* controller,
                            const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
                            ::google::protobuf::Closure* done) override;
@@ -861,6 +866,14 @@ public:
         call_impl(&cloud::MetaService::delete_streaming_job, controller, 
request, response, done);
     }
 
+    void reset_streaming_job_offset(::google::protobuf::RpcController* 
controller,
+                                    const ResetStreamingJobOffsetRequest* 
request,
+                                    ResetStreamingJobOffsetResponse* response,
+                                    ::google::protobuf::Closure* done) 
override {
+        call_impl(&cloud::MetaService::reset_streaming_job_offset, controller, 
request, response,
+                  done);
+    }
+
     void reset_rl_progress(::google::protobuf::RpcController* controller,
                            const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
                            ::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index d2bede30fc4..cf7ef31ba68 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -901,6 +901,110 @@ void 
MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr
     }
 }
 
+void 
MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcController* 
controller,
+                                                 const 
ResetStreamingJobOffsetRequest* request,
+                                                 
ResetStreamingJobOffsetResponse* response,
+                                                 ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(reset_streaming_job_offset, get, put, del);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(reset_streaming_job_offset)
+
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    if (!request->has_db_id() || !request->has_job_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty db_id or job_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+
+    int64_t db_id = request->db_id();
+    int64_t job_id = request->job_id();
+    std::string streaming_job_key_str = streaming_job_key({instance_id, db_id, 
job_id});
+    std::string streaming_job_val;
+
+    // If no offset provided, remove the streaming job progress
+    if (!request->has_offset()) {
+        txn->remove(streaming_job_key_str);
+        LOG(INFO) << "remove streaming_job_key key=" << 
hex(streaming_job_key_str);
+    } else {
+        // If offset is provided, update the streaming job progress
+        bool prev_existed = true;
+        StreamingTaskCommitAttachmentPB prev_job_info;
+        err = txn->get(streaming_job_key_str, &streaming_job_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                prev_existed = false;
+            } else {
+                code = cast_as<ErrCategory::READ>(err);
+                ss << "failed to get streaming job progress, db_id=" << db_id
+                   << " job_id=" << job_id << " err=" << err;
+                msg = ss.str();
+                return;
+            }
+        }
+
+        if (prev_existed) {
+            if (!prev_job_info.ParseFromString(streaming_job_val)) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "failed to parse streaming job offset, db_id=" << db_id
+                   << " job_id=" << job_id;
+                msg = ss.str();
+                return;
+            }
+        }
+
+        std::string new_job_val;
+        StreamingTaskCommitAttachmentPB new_job_info;
+
+        // Set the new offset
+        new_job_info.set_offset(request->offset());
+        new_job_info.set_job_id(job_id);
+
+        // Preserve existing statistics if they exist
+        if (prev_existed) {
+            new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
+            new_job_info.set_load_bytes(prev_job_info.load_bytes());
+            new_job_info.set_num_files(prev_job_info.num_files());
+            new_job_info.set_file_bytes(prev_job_info.file_bytes());
+        }
+
+        if (!new_job_info.SerializeToString(&new_job_val)) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            ss << "failed to serialize new streaming job val, db_id=" << db_id
+               << " job_id=" << job_id;
+            msg = ss.str();
+            return;
+        }
+
+        txn->put(streaming_job_key_str, new_job_val);
+        LOG(INFO) << "reset offset, put streaming_job_key key=" << 
hex(streaming_job_key_str)
+                  << " prev job val: " << prev_job_info.ShortDebugString()
+                  << " new job val: " << new_job_info.ShortDebugString();
+    }
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::COMMIT>(err);
+        ss << "failed to commit streaming job offset, db_id=" << db_id << " 
job_id=" << job_id
+           << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+}
+
 void MetaServiceImpl::delete_streaming_job(::google::protobuf::RpcController* 
controller,
                                            const DeleteStreamingJobRequest* 
request,
                                            DeleteStreamingJobResponse* 
response,
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index a44b5d8537e..d3e2cea089e 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -5259,4 +5259,147 @@ TEST(MetaServiceJobTest, 
DeleteJobKeyRemovesStreamingMeta) {
     ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
 }
 
+TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "test_cloud_instance_id";
+    std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int64_t db_id = 1003;
+    int64_t job_id = 2003;
+
+    // First, create a streaming job by committing a txn
+    {
+        CommitTxnRequest request;
+        request.set_cloud_unique_id(cloud_unique_id);
+        // Begin a txn to obtain a valid txn_id and db_id mapping
+        {
+            brpc::Controller cntl_bt;
+            BeginTxnRequest bt_req;
+            BeginTxnResponse bt_res;
+            auto* txn_info = bt_req.mutable_txn_info();
+            txn_info->set_db_id(db_id);
+            txn_info->set_label("streaming_ut_reset_offset");
+            txn_info->add_table_ids(1);
+            txn_info->set_load_job_source_type(
+                    LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+            txn_info->set_timeout_ms(36000);
+            meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+            ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+            request.set_txn_id(bt_res.txn_id());
+            request.set_db_id(db_id);
+        }
+
+        TxnCommitAttachmentPB* attachment = 
request.mutable_commit_attachment();
+        
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+        StreamingTaskCommitAttachmentPB* streaming_attach =
+                attachment->mutable_streaming_task_txn_commit_attachment();
+        streaming_attach->set_job_id(job_id);
+        streaming_attach->set_offset("original_offset");
+        streaming_attach->set_scanned_rows(1000);
+        streaming_attach->set_load_bytes(5000);
+        streaming_attach->set_num_files(10);
+        streaming_attach->set_file_bytes(8000);
+
+        CommitTxnResponse response;
+        brpc::Controller cntl;
+        meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+    }
+
+    // Verify initial state
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id(cloud_unique_id);
+        request.set_db_id(db_id);
+        request.set_job_id(job_id);
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+        EXPECT_TRUE(response.has_commit_attach());
+        EXPECT_EQ(response.commit_attach().offset(), "original_offset");
+        EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+        EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
+    }
+
+    // Test case 1: Reset offset for existing job
+    {
+        ResetStreamingJobOffsetRequest request;
+        request.set_cloud_unique_id(cloud_unique_id);
+        request.set_db_id(db_id);
+        request.set_job_id(job_id);
+        request.set_offset("reset_offset");
+
+        ResetStreamingJobOffsetResponse response;
+        brpc::Controller cntl;
+        meta_service->reset_streaming_job_offset(&cntl, &request, &response, 
nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+    }
+
+    // Verify offset was reset
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id(cloud_unique_id);
+        request.set_db_id(db_id);
+        request.set_job_id(job_id);
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+        EXPECT_TRUE(response.has_commit_attach());
+        EXPECT_EQ(response.commit_attach().offset(), "reset_offset");
+        // Other fields should remain unchanged
+        EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+        EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
+        EXPECT_EQ(response.commit_attach().num_files(), 10);
+        EXPECT_EQ(response.commit_attach().file_bytes(), 8000);
+    }
+
+    // Test case 2: Reset offset multiple times
+    {
+        ResetStreamingJobOffsetRequest request;
+        request.set_cloud_unique_id(cloud_unique_id);
+        request.set_db_id(db_id);
+        request.set_job_id(job_id);
+        request.set_offset("second_reset_offset");
+
+        ResetStreamingJobOffsetResponse response;
+        brpc::Controller cntl;
+        meta_service->reset_streaming_job_offset(&cntl, &request, &response, 
nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+    }
+
+    // Verify the second reset
+    {
+        GetStreamingTaskCommitAttachRequest request;
+        request.set_cloud_unique_id(cloud_unique_id);
+        request.set_db_id(db_id);
+        request.set_job_id(job_id);
+
+        GetStreamingTaskCommitAttachResponse response;
+        brpc::Controller cntl;
+        meta_service->get_streaming_task_commit_attach(&cntl, &request, 
&response, nullptr);
+
+        EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+        EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+        EXPECT_TRUE(response.has_commit_attach());
+        EXPECT_EQ(response.commit_attach().offset(), "second_reset_offset");
+    }
+}
+
 } // namespace doris::cloud
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 027eed539bf..82ac565be39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -448,6 +448,18 @@ public class MetaServiceClient {
                 .resetRlProgress(request);
     }
 
+    public Cloud.ResetStreamingJobOffsetResponse 
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request) {
+        if (!request.hasCloudUniqueId()) {
+            Cloud.ResetStreamingJobOffsetRequest.Builder builder =
+                    Cloud.ResetStreamingJobOffsetRequest.newBuilder();
+            builder.mergeFrom(request);
+            return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                    
.resetStreamingJobOffset(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+        }
+        return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                .resetStreamingJobOffset(request);
+    }
+
     public Cloud.GetObjStoreInfoResponse
             getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) {
         if (!request.hasCloudUniqueId()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 232e544f502..97fa72e4585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -419,6 +419,11 @@ public class MetaServiceProxy {
         return w.executeRequest((client) -> client.resetRLProgress(request));
     }
 
+    public Cloud.ResetStreamingJobOffsetResponse 
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
+            throws RpcException {
+        return w.executeRequest((client) -> 
client.resetStreamingJobOffset(request));
+    }
+
     public Cloud.GetObjStoreInfoResponse
             getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws 
RpcException {
         return w.executeRequest((client) -> client.getObjStoreInfo(request));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b4accf1b15e..a21a3c854fa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -227,7 +227,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return offset;
     }
 
-    public void alterJob(AlterJobCommand alterJobCommand) throws 
AnalysisException {
+    public void alterJob(AlterJobCommand alterJobCommand) throws 
AnalysisException, JobException {
         List<String> logParts = new ArrayList<>();
         // update sql
         if (StringUtils.isNotEmpty(alterJobCommand.getSql())) {
@@ -456,30 +456,61 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         setJobStatus(replayJob.getJobStatus());
         try {
             modifyPropertiesInternal(replayJob.getProperties());
+            // When the pause state is restarted, it also needs to be updated
+            if (Config.isCloudMode()) {
+                replayOnCloudMode();
+            }
         } catch (Exception e) {
             // should not happen
             log.error("replay modify streaming insert job properties failed, 
job id: {}", getJobId(), e);
         }
         setExecuteSql(replayJob.getExecuteSql());
+        setSucceedTaskCount(replayJob.getSucceedTaskCount());
+        setFailedTaskCount(replayJob.getFailedTaskCount());
+        setCanceledTaskCount(replayJob.getCanceledTaskCount());
     }
 
     /**
      * When updating offset, you need to reset the currentOffset
      */
-    private void modifyPropertiesInternal(Map<String, String> inputProperties) 
throws AnalysisException {
+    private void modifyPropertiesInternal(Map<String, String> inputProperties) 
throws AnalysisException, JobException {
         StreamingJobProperties inputStreamProps = new 
StreamingJobProperties(inputProperties);
         if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) {
             Offset offset = 
validateOffset(inputStreamProps.getOffsetProperty());
             this.offsetProvider.updateOffset(offset);
-
             if (Config.isCloudMode()) {
-                // todo: reset cloud currentOffset
+                resetCloudProgress(offset);
             }
         }
         this.properties.putAll(inputProperties);
         this.jobProperties = new StreamingJobProperties(this.properties);
     }
 
+    private void resetCloudProgress(Offset offset) throws JobException {
+        Cloud.ResetStreamingJobOffsetRequest.Builder builder = 
Cloud.ResetStreamingJobOffsetRequest.newBuilder();
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+        builder.setDbId(getDbId());
+        builder.setJobId(getJobId());
+        builder.setOffset(offset.toSerializedJson());
+
+        Cloud.ResetStreamingJobOffsetResponse response;
+        try {
+            response = 
MetaServiceProxy.getInstance().resetStreamingJobOffset(builder.build());
+            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                log.warn("failed to reset streaming job cloud offset, 
response: {}", response);
+                if (response.getStatus().getCode() == 
Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
+                    log.warn("not found streaming job offset, response: {}", 
response);
+                    return;
+                } else {
+                    throw new JobException(response.getStatus().getMsg());
+                }
+            }
+        } catch (RpcException e) {
+            log.info("failed to reset cloud progress, ", e);
+            throw new JobException(e.getMessage());
+        }
+    }
+
     @Override
     public ShowResultSetMetaData getTaskMetaData() {
         return InsertJob.TASK_META_DATA;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 1a73cb905c2..8e5794f4787 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1923,6 +1923,18 @@ message DeleteStreamingJobResponse {
     optional MetaServiceResponseStatus status = 1;
 }
 
+message ResetStreamingJobOffsetRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 db_id = 2;
+    optional int64 job_id = 3;
+    optional string offset = 4;
+    optional string request_ip = 5;
+}
+
+message ResetStreamingJobOffsetResponse {
+    optional MetaServiceResponseStatus status = 1;
+}
+
 message CheckKeyInfos {
     repeated int64 db_ids = 1;
     repeated int64 table_ids = 2;
@@ -2133,6 +2145,7 @@ service MetaService {
     // streaming job meta
     rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest) 
returns (GetStreamingTaskCommitAttachResponse);
     rpc delete_streaming_job(DeleteStreamingJobRequest) returns 
(DeleteStreamingJobResponse);
+    rpc reset_streaming_job_offset(ResetStreamingJobOffsetRequest) returns 
(ResetStreamingJobOffsetResponse);
 
     // check KV
     rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
new file mode 100644
index 00000000000..e6deaeceeb8
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_job_alter_offset_restart_fe", "docker") {
+    def tableName = "test_streaming_job_alter_offset_restart_fe_tbl"
+    def jobName = "test_streaming_job_alter_offset_restart_fe_name"
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    // run in cloud and not cloud
+    options.cloudMode = null
+    
+    docker(options) {
+        sql """drop table if exists `${tableName}` force"""
+        sql """
+            DROP JOB IF EXISTS where jobname =  '${jobName}'
+        """
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `c1` int NULL,
+                `c2` string NULL,
+                `c3` int  NULL,
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`c1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        // create streaming job
+        sql """
+           CREATE JOB ${jobName}  
+           PROPERTIES (
+            'offset' = '{"fileName":"regression/load/data/example_0.csv"}'
+           ) 
+           ON STREAMING DO INSERT INTO ${tableName} 
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+
+        try {
+            // Wait for streaming job to process some data
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        // check job status and succeed task count larger than 
1
+                        jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+         def jobInfo = sql """
+            select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+
+        sql """
+            PAUSE JOB where jobname =  '${jobName}'
+        """
+
+        // alter offset
+        sql """
+        ALTER JOB ${jobName} 
+        PROPERTIES (
+            'offset' = '{"fileName":"regression/load/data/anoexist1234.csv"}'
+        )
+        """
+
+        jobInfo = sql """
+        select currentOffset, loadStatistic, properties from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/anoexist1234.csv\"}";
+        assert jobInfo.get(0).get(1) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+        assert jobInfo.get(0).get(2) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
+
+        // Restart FE
+        cluster.restartFrontends()
+        sleep(30000)
+        context.reconnectFe()
+
+        // check is it consistent after restart
+        def jobStatus = sql """
+            select status, SucceedTaskCount from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("jobstatus: " + jobStatus)
+        assert jobStatus.get(0).get(0) == "PAUSED"
+        jobInfo = sql """
+        select currentOffset, loadStatistic, properties from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/anoexist1234.csv\"}";
+        assert jobInfo.get(0).get(1) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+        assert jobInfo.get(0).get(2) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
+
+        // resume to check whether consumption will resume
+        sql """
+        RESUME JOB where jobname = '${jobName}'
+        """
+        // wait to running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        jobStatus = sql """ select status, SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobStatus: " + jobStatus)
+                        // check job status and succeed task count larger than 
1
+                        jobStatus.size() == 1 && jobStatus.get(0).get(0) == 
'RUNNING' && '2' <= jobStatus.get(0).get(1)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        jobInfo = sql """
+        select currentOffset, endoffset, loadStatistic, properties from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}"
+        assert jobInfo.get(0).get(3) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
+
+        sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
+        sql """drop table if exists `${tableName}` force"""
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to