SaintBacchus commented on code in PR #16006:
URL: https://github.com/apache/doris/pull/16006#discussion_r1071839021
##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskProcessor.java:
##########
@@ -17,241 +17,139 @@
package org.apache.doris.mtmv;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.analysis.StatementBase;
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedView;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.system.SystemInfoService;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.StringReader;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class MTMVTaskProcessor {
private static final Logger LOG =
LogManager.getLogger(MTMVTaskProcessor.class);
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
- private ConnectContext context;
- void process(MTMVTaskContext context) throws Exception {
+ boolean process(MTMVTaskContext context) throws Exception {
String taskId = context.getTask().getTaskId();
long jobId = context.getJob().getId();
- LOG.info("run mtmv logic start, task_id:{}, jobid:{}", taskId, jobId);
- String tableName = context.getTask().getMvName();
- String tmpTableName = genTmpTableName(tableName);
- DatabaseIf db =
Env.getCurrentEnv().getCatalogMgr().getCatalog(InternalCatalog.INTERNAL_CATALOG_NAME)
- .getDbOrAnalysisException(context.getTask().getDbName());
- MaterializedView table = (MaterializedView)
db.getTableOrAnalysisException(tableName);
- if (!table.tryMvTaskLock()) {
- LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}",
taskId, jobId, "get lock fail");
- return;
+ LOG.info("Start to run a MTMV task, taskId={}, jobId={}.", taskId,
jobId);
+
+ String mvName = context.getTask().getMVName();
+ String temporaryMVName = getTemporaryMVName(mvName);
+ Database db = context.getCtx().getEnv().getInternalCatalog()
+ .getDbOrMetaException(context.getTask().getDBName());
+ MaterializedView mv = (MaterializedView)
db.getTableOrAnalysisException(mvName);
+
+ if (!mv.tryLockMVTask()) {
+ LOG.warn("Failed to run the MTMV task, taskId={}, jobId={},
msg={}.", taskId, jobId,
+ "Failed to get the lock");
+ context.getTask().setMessage("Failed to get the lock.");
+ return false;
}
try {
- //step1 create tmp table
- String tmpCreateTableStmt =
genCreateTempMaterializedViewStmt(context, tableName, tmpTableName);
- //check whther tmp table exists, if exists means run mtmv task
failed before, so need to drop it first
- if (db.isTableExist(tmpTableName)) {
- String dropStml = genDropStml(context, tmpTableName);
- ConnectContext dropResult = execSQL(context, dropStml);
- LOG.info("exec drop table stmt, taskid:{}, stmt:{}, ret:{},
msg:{}", taskId, dropStml,
- dropResult.getState(),
dropResult.getState().getInfoMessage());
- }
- ConnectContext createTempTableResult = execSQL(context,
tmpCreateTableStmt);
- LOG.info("exec tmp table stmt, taskid:{}, stmt:{}, ret:{},
msg:{}", taskId, tmpCreateTableStmt,
- createTempTableResult.getState(),
createTempTableResult.getState().getInfoMessage());
- if (createTempTableResult.getState().getStateType() !=
QueryState.MysqlStateType.OK) {
- throw new Throwable("create tmp table failed, sql:" +
tmpCreateTableStmt);
+ // Check whether the temporary materialized view exists, we should
drop the obsolete materialized view first
+ // because it was created by previous tasks which failed to
complete their work.
+ dropMaterializedView(context, temporaryMVName);
+
+ // Step 1: create the temporary materialized view.
+ String createStatement =
generateCreateStatement(mv.clone(temporaryMVName));
+ if (!executeSQL(context, createStatement)) {
+ throw new RuntimeException(
+ "Failed to create the temporary materialized view,
sql=" + createStatement + ".");
}
- //step2 insert data to tmp table
- String insertStmt = genInsertIntoStmt(context, tmpTableName);
- ConnectContext insertDataResult = execSQL(context, insertStmt);
- LOG.info("exec insert into stmt, taskid:{}, stmt:{}, ret:{},
msg:{}, effected_row:{}", taskId, insertStmt,
- insertDataResult.getState(),
insertDataResult.getState().getInfoMessage(),
- insertDataResult.getState().getAffectedRows());
- if (insertDataResult.getState().getStateType() !=
QueryState.MysqlStateType.OK) {
- throw new Throwable("insert data failed, sql:" + insertStmt);
+ // Step 2: insert data to the temporary materialized view.
+ String insertSelectStatement = generateInsertSelectStmt(context,
temporaryMVName);
+ if (!executeSQL(context, insertSelectStatement)) {
+ throw new RuntimeException(
+ "Failed to insert data to the temporary materialized
view, sql=" + insertSelectStatement + ".");
}
-
- //step3 swap tmp table with origin table
- String swapStmt = genSwapStmt(context, tableName, tmpTableName);
- ConnectContext swapResult = execSQL(context, swapStmt);
- LOG.info("exec swap stmt, taskid:{}, stmt:{}, ret:{}, msg:{}",
taskId, swapStmt, swapResult.getState(),
- swapResult.getState().getInfoMessage());
- if (swapResult.getState().getStateType() !=
QueryState.MysqlStateType.OK) {
- throw new Throwable("swap table failed, sql:" + swapStmt);
+ String insertInfoMessage =
context.getCtx().getState().getInfoMessage();
+
+ // Step 3: swap the temporary materialized view with the original
materialized view.
+ String swapStatement = generateSwapStatement(mvName,
temporaryMVName);
+ if (!executeSQL(context, swapStatement)) {
+ throw new RuntimeException(
+ "Failed to swap the temporary materialized view with
the original materialized view, sql="
+ + swapStatement + ".");
}
- //step4 update task info
-
context.getTask().setMessage(insertDataResult.getState().getInfoMessage());
- context.getTask().setState(TaskState.SUCCESS);
- LOG.info("run mtmv task success, task_id:{},jobid:{}", taskId,
jobId);
- } catch (AnalysisException e) {
- LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}",
taskId, jobId, e.getMessage());
- context.getTask().setMessage("run task failed, caused by " +
e.getMessage());
- context.getTask().setState(TaskState.FAILED);
+
+ context.getTask().setMessage(insertInfoMessage);
+ LOG.info("Run MTMV task successfully, taskId={}, jobId={}.",
taskId, jobId);
+ return true;
} catch (Throwable e) {
- LOG.warn("run mtmv task failed, taskid:{}, jobid:{}, msg:{}",
taskId, jobId, e.getMessage());
- context.getTask().setMessage("run task failed, caused by " +
e.getMessage());
- context.getTask().setState(TaskState.FAILED);
+ context.getTask().setMessage(e.getMessage());
Review Comment:
maybe `setMessage` outside, because in the `MTMVTaskExecutorPool.java` you
can still get the runtime exception message
##########
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutorPool.java:
##########
@@ -53,20 +55,22 @@ public void executeTask(MTMVTaskExecutor taskExecutor) {
isSuccess = taskExecutor.executeTask();
if (isSuccess) {
task.setState(TaskState.SUCCESS);
- } else {
- task.setState(TaskState.FAILED);
+ break;
}
- } catch (Exception ex) {
- LOG.warn("failed to execute task.", ex);
- } finally {
- task.setFinishTime(MTMVUtils.getNowTimeStamp());
+ } catch (Throwable t) {
+ LOG.warn("Failed to execute the task, taskId=" +
task.getTaskId() + ".", t);
}
retryTimes--;
Review Comment:
maybe add a sleep here to avoid fast retry?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]