This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a9aebd2d15d [Chore](nereids) remove PauseRoutineLoadStmt (#51832)
a9aebd2d15d is described below
commit a9aebd2d15de8bf8799d0458d7d4443eaae983b8
Author: yaoxiao <[email protected]>
AuthorDate: Tue Jun 24 10:37:10 2025 +0800
[Chore](nereids) remove PauseRoutineLoadStmt (#51832)
---
fe/fe-core/src/main/cup/sql_parser.cup | 11 ----
.../doris/analysis/PauseRoutineLoadStmt.java | 71 ----------------------
.../doris/load/routineload/RoutineLoadManager.java | 44 --------------
.../main/java/org/apache/doris/qe/DdlExecutor.java | 3 -
.../load/routineload/RoutineLoadManagerTest.java | 21 +++----
5 files changed, 10 insertions(+), 140 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 7ba182a2287..c489dd4b6d6 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -2722,17 +2722,6 @@ load_property ::=
:}
;
-pause_routine_load_stmt ::=
- KW_PAUSE KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel
- {:
- RESULT = new PauseRoutineLoadStmt(jobLabel);
- :}
- | KW_PAUSE KW_ALL KW_ROUTINE KW_LOAD
- {:
- RESULT = new PauseRoutineLoadStmt(null);
- :}
- ;
-
show_create_load_stmt ::=
KW_SHOW KW_CREATE KW_LOAD KW_FOR job_label:jobLabel
{:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
deleted file mode 100644
index cda61b903e8..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.UserException;
-
-import com.google.common.base.Strings;
-
-/*
- Pause routine load by name
-
- syntax:
- PAUSE ROUTINE LOAD [database.]name
- */
-public class PauseRoutineLoadStmt extends DdlStmt implements
NotFallbackInParser {
-
- private final LabelName labelName;
- private String db;
-
- public PauseRoutineLoadStmt(LabelName labelName) {
- this.labelName = labelName;
- }
-
- public boolean isAll() {
- return labelName == null;
- }
-
- public String getName() {
- return labelName.getLabelName();
- }
-
- public String getDbFullName() {
- return db;
- }
-
- @Override
- public void analyze(Analyzer analyzer) throws UserException {
- super.analyze(analyzer);
- if (labelName != null) {
- labelName.analyze(analyzer);
- db = labelName.getDbName();
- } else {
- if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
- }
- db = analyzer.getDefaultDb();
- }
- }
-
- @Override
- public StmtType stmtType() {
- return StmtType.PAUSE;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 5185c62f65f..a3e061723cb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -19,7 +19,6 @@ package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
-import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@@ -430,49 +429,6 @@ public class RoutineLoadManager implements Writable {
.build());
}
- public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
- throws UserException {
- List<RoutineLoadJob> jobs = Lists.newArrayList();
- // it needs lock when getting routine load job,
- // otherwise, it may cause the editLog out of order in the following
scenarios:
- // thread A: create job and record job meta
- // thread B: change job state and persist in editlog according to meta
- // thread A: persist in editlog
- // which will cause the null pointer exception when replaying editLog
- readLock();
- try {
- if (pauseRoutineLoadStmt.isAll()) {
- jobs =
checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
- } else {
- RoutineLoadJob routineLoadJob =
checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
- pauseRoutineLoadStmt.getName());
- jobs.add(routineLoadJob);
- }
- } finally {
- readUnlock();
- }
-
- for (RoutineLoadJob routineLoadJob : jobs) {
- try {
- routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
- new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
- "User " +
ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
- false /* not replay */);
- LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId()).add("current_state",
- routineLoadJob.getState()).add("user",
ConnectContext.get().getQualifiedUser()).add("msg",
- "routine load job has been paused by user").build());
- } catch (UserException e) {
- LOG.warn("failed to pause routine load job {}",
routineLoadJob.getName(), e);
- // if user want to pause a certain job and failed, return
error.
- // if user want to pause all possible jobs, skip error jobs.
- if (!pauseRoutineLoadStmt.isAll()) {
- throw e;
- }
- continue;
- }
- }
- }
-
public int getSizeOfIdToRoutineLoadTask() {
int sizeOfTasks = 0;
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5f06bb3ca4d..6f2d953701e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -81,7 +81,6 @@ import org.apache.doris.analysis.DropUserStmt;
import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.DropWorkloadSchedPolicyStmt;
import org.apache.doris.analysis.InstallPluginStmt;
-import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
@@ -167,8 +166,6 @@ public class DdlExecutor {
}
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt)
ddlStmt);
- } else if (ddlStmt instanceof PauseRoutineLoadStmt) {
-
env.getRoutineLoadManager().pauseRoutineLoadJob((PauseRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof AlterRoutineLoadStmt) {
env.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof CreateJobStmt) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 7f5b7e3c65d..9c9c8c82488 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.doris.load.routineload;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.ParseNode;
-import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
@@ -586,10 +585,10 @@ public class RoutineLoadManagerTest {
}
@Test
- public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadStmt
pauseRoutineLoadStmt, @Mocked Env env,
- @Mocked InternalCatalog catalog, @Mocked Database database,
@Mocked Table tbl,
- @Mocked AccessControllerManager accessManager,
- @Mocked ConnectContext connectContext) throws UserException {
+ public void testPauseRoutineLoadJob(@Injectable PauseRoutineLoadCommand
pauseRoutineLoadCommand, @Mocked Env env,
+ @Mocked InternalCatalog catalog,
@Mocked Database database, @Mocked Table tbl,
+ @Mocked AccessControllerManager
accessManager,
+ @Mocked ConnectContext connectContext)
throws UserException {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob
= Maps.newHashMap();
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob =
Maps.newHashMap();
@@ -606,10 +605,10 @@ public class RoutineLoadManagerTest {
new Expectations() {
{
- pauseRoutineLoadStmt.getDbFullName();
+ pauseRoutineLoadCommand.getDbFullName();
minTimes = 0;
result = "";
- pauseRoutineLoadStmt.getName();
+ pauseRoutineLoadCommand.getLabel();
minTimes = 0;
result = "";
env.getInternalCatalog();
@@ -636,7 +635,7 @@ public class RoutineLoadManagerTest {
}
};
- routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt);
+ routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadCommand);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED,
routineLoadJob.getState());
@@ -1006,9 +1005,9 @@ public class RoutineLoadManagerTest {
@Test
public void testPauseAndResumeAllRoutineLoadJob(@Injectable
PauseRoutineLoadCommand pauseRoutineLoadCommand,
- @Injectable ResumeRoutineLoadCommand resumeRoutineLoadCommand,
@Mocked Env env, @Mocked InternalCatalog catalog,
- @Mocked Database database, @Mocked Table tbl, @Mocked
AccessControllerManager accessManager,
- @Mocked ConnectContext connectContext) throws UserException {
+ @Injectable
ResumeRoutineLoadCommand resumeRoutineLoadCommand, @Mocked Env env, @Mocked
InternalCatalog catalog,
+ @Mocked Database database,
@Mocked Table tbl, @Mocked AccessControllerManager accessManager,
+ @Mocked ConnectContext
connectContext) throws UserException {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob
= Maps.newHashMap();
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob =
Maps.newHashMap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]