This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 56aef960cc7 branch-4.0: [improve](job) support change offset for
streaming job in cloud mode #57264 (#57434)
56aef960cc7 is described below
commit 56aef960cc7aa1d9f9eba980e5b7925ab1c121b4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 6 19:48:03 2025 +0800
branch-4.0: [improve](job) support change offset for streaming job in cloud
mode #57264 (#57434)
Cherry-picked from #57264
Co-authored-by: wudi <[email protected]>
---
cloud/src/common/bvars.cpp | 9 ++
cloud/src/common/bvars.h | 7 +
cloud/src/meta-service/meta_service.h | 13 ++
cloud/src/meta-service/meta_service_txn.cpp | 104 +++++++++++++
cloud/test/meta_service_job_test.cpp | 143 +++++++++++++++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 12 ++
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 5 +
.../insert/streaming/StreamingInsertJob.java | 39 ++++-
gensrc/proto/cloud.proto | 13 ++
...st_streaming_job_alter_offset_restart_fe.groovy | 172 +++++++++++++++++++++
10 files changed, 513 insertions(+), 4 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 4f8da0076cf..ac36976553b 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -91,6 +91,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms",
"get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms",
"get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms",
"get_streaming_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job("ms",
"delete_streaming_job");
+BvarLatencyRecorderWithTag g_bvar_ms_reset_streaming_job_offset("ms",
"reset_streaming_job_offset");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms",
"reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms",
"start_tablet_job");
@@ -386,6 +387,10 @@ mBvarInt64Adder
g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_
mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
// delete_streaming_job
mBvarInt64Adder
g_bvar_rpc_kv_delete_streaming_job_del_counter("rpc_kv_delete_streaming_job_del_counter",{"instance_id"});
+// reset_streaming_job_offset
+mBvarInt64Adder
g_bvar_rpc_kv_reset_streaming_job_offset_get_counter("rpc_kv_reset_streaming_job_offset_get_counter",{"instance_id"});
+mBvarInt64Adder
g_bvar_rpc_kv_reset_streaming_job_offset_put_counter("rpc_kv_reset_streaming_job_offset_put_counter",{"instance_id"});
+mBvarInt64Adder
g_bvar_rpc_kv_reset_streaming_job_offset_del_counter("rpc_kv_reset_streaming_job_offset_del_counter",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
@@ -578,6 +583,10 @@ mBvarInt64Adder
g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl
mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
// delete_streaming_job
mBvarInt64Adder
g_bvar_rpc_kv_delete_streaming_job_del_bytes("rpc_kv_delete_streaming_job_del_bytes",{"instance_id"});
+// reset_streaming_job_offset
+mBvarInt64Adder
g_bvar_rpc_kv_reset_streaming_job_offset_get_bytes("rpc_kv_reset_streaming_job_offset_get_bytes",{"instance_id"});
+mBvarInt64Adder
g_bvar_rpc_kv_reset_streaming_job_offset_put_bytes("rpc_kv_reset_streaming_job_offset_put_bytes",{"instance_id"});
+mBvarInt64Adder
g_bvar_rpc_kv_reset_streaming_job_offset_del_bytes("rpc_kv_reset_streaming_job_offset_del_bytes",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 0153248c499..5641a2a30ce 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -607,6 +607,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job;
+extern BvarLatencyRecorderWithTag g_bvar_ms_reset_streaming_job_offset;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
@@ -838,6 +839,9 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
extern mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
@@ -970,6 +974,9 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
extern mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_reset_streaming_job_offset_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index f84fa396d86..0e5ca28aee6 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -340,6 +340,11 @@ public:
DeleteStreamingJobResponse* response,
::google::protobuf::Closure* done) override;
+ void reset_streaming_job_offset(::google::protobuf::RpcController*
controller,
+ const ResetStreamingJobOffsetRequest*
request,
+ ResetStreamingJobOffsetResponse* response,
+ ::google::protobuf::Closure* done)
override;
+
void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override;
@@ -861,6 +866,14 @@ public:
call_impl(&cloud::MetaService::delete_streaming_job, controller,
request, response, done);
}
+ void reset_streaming_job_offset(::google::protobuf::RpcController*
controller,
+ const ResetStreamingJobOffsetRequest*
request,
+ ResetStreamingJobOffsetResponse* response,
+ ::google::protobuf::Closure* done)
override {
+ call_impl(&cloud::MetaService::reset_streaming_job_offset, controller,
request, response,
+ done);
+ }
+
void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index d2bede30fc4..cf7ef31ba68 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -901,6 +901,110 @@ void
MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr
}
}
+void
MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcController*
controller,
+ const
ResetStreamingJobOffsetRequest* request,
+
ResetStreamingJobOffsetResponse* response,
+ ::google::protobuf::Closure*
done) {
+ RPC_PREPROCESS(reset_streaming_job_offset, get, put, del);
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+ RPC_RATE_LIMIT(reset_streaming_job_offset)
+
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "failed to create txn, err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ if (!request->has_db_id() || !request->has_job_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty db_id or job_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ int64_t db_id = request->db_id();
+ int64_t job_id = request->job_id();
+ std::string streaming_job_key_str = streaming_job_key({instance_id, db_id,
job_id});
+ std::string streaming_job_val;
+
+ // If no offset provided, remove the streaming job progress
+ if (!request->has_offset()) {
+ txn->remove(streaming_job_key_str);
+ LOG(INFO) << "remove streaming_job_key key=" <<
hex(streaming_job_key_str);
+ } else {
+ // If offset is provided, update the streaming job progress
+ bool prev_existed = true;
+ StreamingTaskCommitAttachmentPB prev_job_info;
+ err = txn->get(streaming_job_key_str, &streaming_job_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ prev_existed = false;
+ } else {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get streaming job progress, db_id=" << db_id
+ << " job_id=" << job_id << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+ }
+
+ if (prev_existed) {
+ if (!prev_job_info.ParseFromString(streaming_job_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "failed to parse streaming job offset, db_id=" << db_id
+ << " job_id=" << job_id;
+ msg = ss.str();
+ return;
+ }
+ }
+
+ std::string new_job_val;
+ StreamingTaskCommitAttachmentPB new_job_info;
+
+ // Set the new offset
+ new_job_info.set_offset(request->offset());
+ new_job_info.set_job_id(job_id);
+
+ // Preserve existing statistics if they exist
+ if (prev_existed) {
+ new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
+ new_job_info.set_load_bytes(prev_job_info.load_bytes());
+ new_job_info.set_num_files(prev_job_info.num_files());
+ new_job_info.set_file_bytes(prev_job_info.file_bytes());
+ }
+
+ if (!new_job_info.SerializeToString(&new_job_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ ss << "failed to serialize new streaming job val, db_id=" << db_id
+ << " job_id=" << job_id;
+ msg = ss.str();
+ return;
+ }
+
+ txn->put(streaming_job_key_str, new_job_val);
+ LOG(INFO) << "reset offset, put streaming_job_key key=" <<
hex(streaming_job_key_str)
+ << " prev job val: " << prev_job_info.ShortDebugString()
+ << " new job val: " << new_job_info.ShortDebugString();
+ }
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit streaming job offset, db_id=" << db_id << "
job_id=" << job_id
+ << " err=" << err;
+ msg = ss.str();
+ return;
+ }
+}
+
void MetaServiceImpl::delete_streaming_job(::google::protobuf::RpcController*
controller,
const DeleteStreamingJobRequest*
request,
DeleteStreamingJobResponse*
response,
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index a44b5d8537e..d3e2cea089e 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -5259,4 +5259,147 @@ TEST(MetaServiceJobTest,
DeleteJobKeyRemovesStreamingMeta) {
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
+TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "test_cloud_instance_id";
+ std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int64_t db_id = 1003;
+ int64_t job_id = 2003;
+
+ // First, create a streaming job by committing a txn
+ {
+ CommitTxnRequest request;
+ request.set_cloud_unique_id(cloud_unique_id);
+ // Begin a txn to obtain a valid txn_id and db_id mapping
+ {
+ brpc::Controller cntl_bt;
+ BeginTxnRequest bt_req;
+ BeginTxnResponse bt_res;
+ auto* txn_info = bt_req.mutable_txn_info();
+ txn_info->set_db_id(db_id);
+ txn_info->set_label("streaming_ut_reset_offset");
+ txn_info->add_table_ids(1);
+ txn_info->set_load_job_source_type(
+ LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB);
+ txn_info->set_timeout_ms(36000);
+ meta_service->begin_txn(&cntl_bt, &bt_req, &bt_res, nullptr);
+ ASSERT_EQ(bt_res.status().code(), MetaServiceCode::OK);
+ request.set_txn_id(bt_res.txn_id());
+ request.set_db_id(db_id);
+ }
+
+ TxnCommitAttachmentPB* attachment =
request.mutable_commit_attachment();
+
attachment->set_type(TxnCommitAttachmentPB::STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+ StreamingTaskCommitAttachmentPB* streaming_attach =
+ attachment->mutable_streaming_task_txn_commit_attachment();
+ streaming_attach->set_job_id(job_id);
+ streaming_attach->set_offset("original_offset");
+ streaming_attach->set_scanned_rows(1000);
+ streaming_attach->set_load_bytes(5000);
+ streaming_attach->set_num_files(10);
+ streaming_attach->set_file_bytes(8000);
+
+ CommitTxnResponse response;
+ brpc::Controller cntl;
+ meta_service->commit_txn(&cntl, &request, &response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ }
+
+ // Verify initial state
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id(cloud_unique_id);
+ request.set_db_id(db_id);
+ request.set_job_id(job_id);
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ EXPECT_TRUE(response.has_commit_attach());
+ EXPECT_EQ(response.commit_attach().offset(), "original_offset");
+ EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+ EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
+ }
+
+ // Test case 1: Reset offset for existing job
+ {
+ ResetStreamingJobOffsetRequest request;
+ request.set_cloud_unique_id(cloud_unique_id);
+ request.set_db_id(db_id);
+ request.set_job_id(job_id);
+ request.set_offset("reset_offset");
+
+ ResetStreamingJobOffsetResponse response;
+ brpc::Controller cntl;
+ meta_service->reset_streaming_job_offset(&cntl, &request, &response,
nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ }
+
+ // Verify offset was reset
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id(cloud_unique_id);
+ request.set_db_id(db_id);
+ request.set_job_id(job_id);
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ EXPECT_TRUE(response.has_commit_attach());
+ EXPECT_EQ(response.commit_attach().offset(), "reset_offset");
+ // Other fields should remain unchanged
+ EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+ EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
+ EXPECT_EQ(response.commit_attach().num_files(), 10);
+ EXPECT_EQ(response.commit_attach().file_bytes(), 8000);
+ }
+
+ // Test case 2: Reset offset multiple times
+ {
+ ResetStreamingJobOffsetRequest request;
+ request.set_cloud_unique_id(cloud_unique_id);
+ request.set_db_id(db_id);
+ request.set_job_id(job_id);
+ request.set_offset("second_reset_offset");
+
+ ResetStreamingJobOffsetResponse response;
+ brpc::Controller cntl;
+ meta_service->reset_streaming_job_offset(&cntl, &request, &response,
nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ }
+
+ // Verify the second reset
+ {
+ GetStreamingTaskCommitAttachRequest request;
+ request.set_cloud_unique_id(cloud_unique_id);
+ request.set_db_id(db_id);
+ request.set_job_id(job_id);
+
+ GetStreamingTaskCommitAttachResponse response;
+ brpc::Controller cntl;
+ meta_service->get_streaming_task_commit_attach(&cntl, &request,
&response, nullptr);
+
+ EXPECT_FALSE(cntl.Failed()) << "Error: " << cntl.ErrorText();
+ EXPECT_EQ(response.status().code(), MetaServiceCode::OK);
+ EXPECT_TRUE(response.has_commit_attach());
+ EXPECT_EQ(response.commit_attach().offset(), "second_reset_offset");
+ }
+}
+
} // namespace doris::cloud
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 027eed539bf..82ac565be39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -448,6 +448,18 @@ public class MetaServiceClient {
.resetRlProgress(request);
}
+ public Cloud.ResetStreamingJobOffsetResponse
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request) {
+ if (!request.hasCloudUniqueId()) {
+ Cloud.ResetStreamingJobOffsetRequest.Builder builder =
+ Cloud.ResetStreamingJobOffsetRequest.newBuilder();
+ builder.mergeFrom(request);
+ return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
+
.resetStreamingJobOffset(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+ }
+ return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
+ .resetStreamingJobOffset(request);
+ }
+
public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) {
if (!request.hasCloudUniqueId()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 232e544f502..97fa72e4585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -419,6 +419,11 @@ public class MetaServiceProxy {
return w.executeRequest((client) -> client.resetRLProgress(request));
}
+ public Cloud.ResetStreamingJobOffsetResponse
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
+ throws RpcException {
+ return w.executeRequest((client) ->
client.resetStreamingJobOffset(request));
+ }
+
public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws
RpcException {
return w.executeRequest((client) -> client.getObjStoreInfo(request));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b4accf1b15e..a21a3c854fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -227,7 +227,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return offset;
}
- public void alterJob(AlterJobCommand alterJobCommand) throws
AnalysisException {
+ public void alterJob(AlterJobCommand alterJobCommand) throws
AnalysisException, JobException {
List<String> logParts = new ArrayList<>();
// update sql
if (StringUtils.isNotEmpty(alterJobCommand.getSql())) {
@@ -456,30 +456,61 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
setJobStatus(replayJob.getJobStatus());
try {
modifyPropertiesInternal(replayJob.getProperties());
+ // When the pause state is restarted, it also needs to be updated
+ if (Config.isCloudMode()) {
+ replayOnCloudMode();
+ }
} catch (Exception e) {
// should not happen
log.error("replay modify streaming insert job properties failed,
job id: {}", getJobId(), e);
}
setExecuteSql(replayJob.getExecuteSql());
+ setSucceedTaskCount(replayJob.getSucceedTaskCount());
+ setFailedTaskCount(replayJob.getFailedTaskCount());
+ setCanceledTaskCount(replayJob.getCanceledTaskCount());
}
/**
* When updating offset, you need to reset the currentOffset
*/
- private void modifyPropertiesInternal(Map<String, String> inputProperties)
throws AnalysisException {
+ private void modifyPropertiesInternal(Map<String, String> inputProperties)
throws AnalysisException, JobException {
StreamingJobProperties inputStreamProps = new
StreamingJobProperties(inputProperties);
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) {
Offset offset =
validateOffset(inputStreamProps.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
-
if (Config.isCloudMode()) {
- // todo: reset cloud currentOffset
+ resetCloudProgress(offset);
}
}
this.properties.putAll(inputProperties);
this.jobProperties = new StreamingJobProperties(this.properties);
}
+ private void resetCloudProgress(Offset offset) throws JobException {
+ Cloud.ResetStreamingJobOffsetRequest.Builder builder =
Cloud.ResetStreamingJobOffsetRequest.newBuilder();
+ builder.setCloudUniqueId(Config.cloud_unique_id);
+ builder.setDbId(getDbId());
+ builder.setJobId(getJobId());
+ builder.setOffset(offset.toSerializedJson());
+
+ Cloud.ResetStreamingJobOffsetResponse response;
+ try {
+ response =
MetaServiceProxy.getInstance().resetStreamingJobOffset(builder.build());
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ log.warn("failed to reset streaming job cloud offset,
response: {}", response);
+ if (response.getStatus().getCode() ==
Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
+ log.warn("not found streaming job offset, response: {}",
response);
+ return;
+ } else {
+ throw new JobException(response.getStatus().getMsg());
+ }
+ }
+ } catch (RpcException e) {
+ log.info("failed to reset cloud progress, ", e);
+ throw new JobException(e.getMessage());
+ }
+ }
+
@Override
public ShowResultSetMetaData getTaskMetaData() {
return InsertJob.TASK_META_DATA;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 1a73cb905c2..8e5794f4787 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1923,6 +1923,18 @@ message DeleteStreamingJobResponse {
optional MetaServiceResponseStatus status = 1;
}
+message ResetStreamingJobOffsetRequest {
+ optional string cloud_unique_id = 1; // For auth
+ optional int64 db_id = 2;
+ optional int64 job_id = 3;
+ optional string offset = 4;
+ optional string request_ip = 5;
+}
+
+message ResetStreamingJobOffsetResponse {
+ optional MetaServiceResponseStatus status = 1;
+}
+
message CheckKeyInfos {
repeated int64 db_ids = 1;
repeated int64 table_ids = 2;
@@ -2133,6 +2145,7 @@ service MetaService {
// streaming job meta
rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest)
returns (GetStreamingTaskCommitAttachResponse);
rpc delete_streaming_job(DeleteStreamingJobRequest) returns
(DeleteStreamingJobResponse);
+ rpc reset_streaming_job_offset(ResetStreamingJobOffsetRequest) returns
(ResetStreamingJobOffsetResponse);
// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
new file mode 100644
index 00000000000..e6deaeceeb8
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_job_alter_offset_restart_fe", "docker") {
+ def tableName = "test_streaming_job_alter_offset_restart_fe_tbl"
+ def jobName = "test_streaming_job_alter_offset_restart_fe_name"
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ // run in cloud and not cloud
+ options.cloudMode = null
+
+ docker(options) {
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // create streaming job
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES (
+ 'offset' = '{"fileName":"regression/load/data/example_0.csv"}'
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ // Wait for streaming job to process some data
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than
1
+ jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+
+ // alter offset
+ sql """
+ ALTER JOB ${jobName}
+ PROPERTIES (
+ 'offset' = '{"fileName":"regression/load/data/anoexist1234.csv"}'
+ )
+ """
+
+ jobInfo = sql """
+ select currentOffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/anoexist1234.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
+
+ // Restart FE
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+
+ // check is it consistent after restart
+ def jobStatus = sql """
+ select status, SucceedTaskCount from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("jobstatus: " + jobStatus)
+ assert jobStatus.get(0).get(0) == "PAUSED"
+ jobInfo = sql """
+ select currentOffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/anoexist1234.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(2) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
+
+ // resume to check whether consumption will resume
+ sql """
+ RESUME JOB where jobname = '${jobName}'
+ """
+ // wait to running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ jobStatus = sql """ select status, SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobStatus: " + jobStatus)
+ // check job status and succeed task count larger than
1
+ jobStatus.size() == 1 && jobStatus.get(0).get(0) ==
'RUNNING' && '2' <= jobStatus.get(0).get(1)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}"
+ assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+ sql """drop table if exists `${tableName}` force"""
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]