This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5e443c7619e [fix](routine-load) add lock to avoid editlog out of order
when concurrent update job (#31095) (#31168)
5e443c7619e is described below
commit 5e443c7619ea664210b7ca962adf4513ce650223
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Feb 20 23:40:40 2024 +0800
[fix](routine-load) add lock to avoid editlog out of order when concurrent
update job (#31095) (#31168)
---
.../doris/load/routineload/RoutineLoadManager.java | 56 ++++++++++++++++++----
1 file changed, 47 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index dbf3d6a09a5..4c66c510126 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -303,12 +303,23 @@ public class RoutineLoadManager implements Writable {
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
List<RoutineLoadJob> jobs = Lists.newArrayList();
- if (pauseRoutineLoadStmt.isAll()) {
- jobs =
checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
- } else {
- RoutineLoadJob routineLoadJob =
checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
- pauseRoutineLoadStmt.getName());
- jobs.add(routineLoadJob);
+ // it needs lock when getting routine load job,
+ // otherwise, it may cause the editLog out of order in the following
scenarios:
+ // thread A: create job and record job meta
+ // thread B: change job state and persist in editlog according to meta
+ // thread A: persist in editlog
+ // which will cause the null pointer exception when replaying editLog
+ readLock();
+ try {
+ if (pauseRoutineLoadStmt.isAll()) {
+ jobs =
checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
+ } else {
+ RoutineLoadJob routineLoadJob =
checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
+ pauseRoutineLoadStmt.getName());
+ jobs.add(routineLoadJob);
+ }
+ } finally {
+ readUnlock();
}
for (RoutineLoadJob routineLoadJob : jobs) {
@@ -369,8 +380,20 @@ public class RoutineLoadManager implements Writable {
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
throws UserException {
- RoutineLoadJob routineLoadJob =
checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
- stopRoutineLoadStmt.getName());
+ RoutineLoadJob routineLoadJob;
+ // it needs lock when getting routine load job,
+ // otherwise, it may cause the editLog out of order in the following
scenarios:
+ // thread A: create job and record job meta
+ // thread B: change job state and persist in editlog according to meta
+ // thread A: persist in editlog
+ // which will cause the null pointer exception when replaying editLog
+ readLock();
+ try {
+ routineLoadJob =
checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
+ stopRoutineLoadStmt.getName());
+ } finally {
+ readUnlock();
+ }
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
"User " + ConnectContext.get().getQualifiedUser() + "
stop routine load job"),
@@ -792,6 +815,9 @@ public class RoutineLoadManager implements Writable {
job.updateState(operation.getJobState(), null, true /* is replay
*/);
} catch (UserException e) {
LOG.error("should not happened", e);
+ } catch (NullPointerException npe) {
+ LOG.error("cannot get job when replaying state change job, which
is unexpected, job id: "
+ + operation.getId());
}
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId())
.add("current_state", operation.getJobState())
@@ -803,7 +829,19 @@ public class RoutineLoadManager implements Writable {
* Enter of altering a routine load job
*/
public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws
UserException {
- RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(),
stmt.getLabel());
+ RoutineLoadJob job;
+ // it needs lock when getting routine load job,
+ // otherwise, it may cause the editLog out of order in the following
scenarios:
+ // thread A: create job and record job meta
+ // thread B: change job state and persist in editlog according to meta
+ // thread A: persist in editlog
+ // which will cause the null pointer exception when replaying editLog
+ readLock();
+ try {
+ job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
+ } finally {
+ readUnlock();
+ }
if (stmt.hasDataSourceProperty()
&&
!stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name()))
{
throw new DdlException("The specified job type is not: "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]