This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e217401bf24 [Chore](nereids) Remove createRoutineLoadStmt (#55144)
e217401bf24 is described below

commit e217401bf244ea2614c6c04deb604e3cedaa1ca5
Author: yaoxiao <yx136264...@163.com>
AuthorDate: Tue Aug 26 15:27:40 2025 +0800

    [Chore](nereids) Remove createRoutineLoadStmt (#55144)
---
 .../bloom_parquet_table/bloom_parquet_table        | Bin 58801541 -> 0 bytes
 .../doris/analysis/CreateRoutineLoadStmt.java      | 554 ---------------------
 .../load/routineload/KafkaRoutineLoadJob.java      |  47 +-
 .../doris/load/routineload/RoutineLoadJob.java     | 124 ++---
 .../doris/load/routineload/RoutineLoadManager.java |  33 --
 .../plans/commands/info/CreateRoutineLoadInfo.java |   7 +-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   6 -
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  36 +-
 .../load/routineload/RoutineLoadManagerTest.java   |  96 +---
 .../persist/AlterRoutineLoadOperationLogTest.java  |   6 +-
 10 files changed, 71 insertions(+), 838 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table
 
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table
deleted file mode 100644
index a861d2dfbe3..00000000000
Binary files 
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table
 and /dev/null differ
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
deleted file mode 100644
index ba647e74a37..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ /dev/null
@@ -1,554 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
-import org.apache.doris.load.RoutineLoadDesc;
-import org.apache.doris.load.loadv2.LoadTask;
-import org.apache.doris.load.routineload.AbstractDataSourceProperties;
-import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
-import org.apache.doris.load.routineload.RoutineLoadJob;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.OriginStatement;
-import org.apache.doris.resource.workloadgroup.WorkloadGroup;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Predicate;
-
-/*
- Create routine Load statement,  continually load data from a streaming app
-
- syntax:
-      CREATE ROUTINE LOAD [database.]name on table
-      [load properties]
-      [PROPERTIES
-      (
-          desired_concurrent_number = xxx,
-          max_error_number = xxx,
-          k1 = v1,
-          ...
-          kn = vn
-      )]
-      FROM type of routine load
-      [(
-          k1 = v1,
-          ...
-          kn = vn
-      )]
-
-      load properties:
-          load property [[,] load property] ...
-
-      load property:
-          column separator | columns_mapping | partitions | where
-
-      column separator:
-          COLUMNS TERMINATED BY xxx
-      columns_mapping:
-          COLUMNS (c1, c2, c3 = c1 + c2)
-      partitions:
-          PARTITIONS (p1, p2, p3)
-      where:
-          WHERE c1 > 1
-
-      type of routine load:
-          KAFKA
-*/
-public class CreateRoutineLoadStmt extends DdlStmt implements 
NotFallbackInParser {
-    private static final Logger LOG = 
LogManager.getLogger(CreateRoutineLoadStmt.class);
-
-    // routine load properties
-    public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = 
"desired_concurrent_number";
-    public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY = 
"current_concurrent_number";
-    // max error number in ten thousand records
-    public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number";
-    public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
-    // the following 3 properties limit the time and batch size of a single 
routine load task
-    public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = 
"max_batch_interval";
-    public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows";
-    public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size";
-    public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
-
-    public static final String FORMAT = "format"; // the value is csv or json, 
default is csv
-    public static final String STRIP_OUTER_ARRAY = "strip_outer_array";
-    public static final String JSONPATHS = "jsonpaths";
-    public static final String JSONROOT = "json_root";
-    public static final String NUM_AS_STRING = "num_as_string";
-    public static final String FUZZY_PARSE = "fuzzy_parse";
-
-    public static final String PARTIAL_COLUMNS = "partial_columns";
-
-    public static final String WORKLOAD_GROUP = "workload_group";
-
-    private static final String NAME_TYPE = "ROUTINE LOAD NAME";
-    public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
-    public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
-    public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
-
-    private AbstractDataSourceProperties dataSourceProperties;
-
-
-    private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
-            .add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
-            .add(MAX_ERROR_NUMBER_PROPERTY)
-            .add(MAX_FILTER_RATIO_PROPERTY)
-            .add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
-            .add(MAX_BATCH_ROWS_PROPERTY)
-            .add(MAX_BATCH_SIZE_PROPERTY)
-            .add(FORMAT)
-            .add(JSONPATHS)
-            .add(STRIP_OUTER_ARRAY)
-            .add(NUM_AS_STRING)
-            .add(FUZZY_PARSE)
-            .add(JSONROOT)
-            .add(LoadStmt.STRICT_MODE)
-            .add(LoadStmt.TIMEZONE)
-            .add(EXEC_MEM_LIMIT_PROPERTY)
-            .add(SEND_BATCH_PARALLELISM)
-            .add(LOAD_TO_SINGLE_TABLET)
-            .add(PARTIAL_COLUMNS)
-            .add(WORKLOAD_GROUP)
-            .add(LoadStmt.KEY_ENCLOSE)
-            .add(LoadStmt.KEY_ESCAPE)
-            .build();
-
-    private final LabelName labelName;
-    private String tableName;
-    private final List<ParseNode> loadPropertyList;
-    private final Map<String, String> jobProperties;
-    private final String typeName;
-
-    // the following variables will be initialized after analyze
-    // -1 as unset, the default value will set in RoutineLoadJob
-    private String name;
-    private String dbName;
-    private RoutineLoadDesc routineLoadDesc;
-    private int desiredConcurrentNum = 1;
-    private long maxErrorNum = -1;
-    private double maxFilterRatio = -1;
-    private long maxBatchIntervalS = -1;
-    private long maxBatchRows = -1;
-    private long maxBatchSizeBytes = -1;
-    private boolean strictMode = true;
-    private long execMemLimit = 2 * 1024 * 1024 * 1024L;
-    private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
-    private int sendBatchParallelism = 1;
-    private boolean loadToSingleTablet = false;
-
-    private FileFormatProperties fileFormatProperties;
-
-    private String workloadGroupName = "";
-
-    /**
-     * support partial columns load(Only Unique Key Columns)
-     */
-    @Getter
-    private boolean isPartialUpdate = false;
-
-    private String comment = "";
-
-    private LoadTask.MergeType mergeType;
-
-    private boolean isMultiTable = false;
-
-    public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) 
-> v > 0L;
-    public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L;
-    public static final Predicate<Double> MAX_FILTER_RATIO_PRED = (v) -> v >= 
0 && v <= 1;
-    public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >= 
1;
-    public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >= 
200000;
-    public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100 
* 1024 * 1024
-                                                            && v <= (long) 
(1024 * 1024 * 1024) * 10;
-    public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L;
-    public static final Predicate<Long> SEND_BATCH_PARALLELISM_PRED = (v) -> v 
> 0L;
-
-    public CreateRoutineLoadStmt(LabelName labelName, String tableName, 
List<ParseNode> loadPropertyList,
-                                 Map<String, String> jobProperties, String 
typeName,
-                                 Map<String, String> dataSourceProperties, 
LoadTask.MergeType mergeType,
-                                 String comment) {
-        this.labelName = labelName;
-        if (StringUtils.isBlank(tableName)) {
-            this.isMultiTable = true;
-        }
-        this.tableName = tableName;
-        this.loadPropertyList = loadPropertyList;
-        this.jobProperties = jobProperties == null ? Maps.newHashMap() : 
jobProperties;
-        this.typeName = typeName.toUpperCase();
-        this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
-                .createDataSource(typeName, dataSourceProperties, 
this.isMultiTable);
-        this.mergeType = mergeType;
-        this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
-        if (comment != null) {
-            this.comment = comment;
-        }
-        String format = 
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
-        fileFormatProperties = 
FileFormatProperties.createFileFormatProperties(format);
-    }
-
-    /*
-     * make stmt by nereids
-     */
-    public CreateRoutineLoadStmt(LabelName labelName, String dbName, String 
name, String tableName,
-            List<ParseNode> loadPropertyList, OriginStatement origStmt, 
UserIdentity userIdentity,
-            Map<String, String> jobProperties, String typeName, 
RoutineLoadDesc routineLoadDesc,
-            int desireTaskConcurrentNum, long maxErrorNum, double 
maxFilterRatio, long maxBatchIntervalS,
-            long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int 
sendBatchParallelism, String timezone,
-            String workloadGroupName, boolean loadToSingleTablet, boolean 
strictMode,
-            boolean isPartialUpdate, AbstractDataSourceProperties 
dataSourceProperties,
-            FileFormatProperties fileFormatProperties) {
-        this.labelName = labelName;
-        this.dbName = dbName;
-        this.name = name;
-        this.tableName = tableName;
-        this.loadPropertyList = loadPropertyList;
-        this.setOrigStmt(origStmt);
-        this.setUserInfo(userIdentity);
-        this.jobProperties = jobProperties;
-        this.typeName = typeName;
-        this.routineLoadDesc = routineLoadDesc;
-        this.desiredConcurrentNum = desireTaskConcurrentNum;
-        this.maxErrorNum = maxErrorNum;
-        this.maxFilterRatio = maxFilterRatio;
-        this.maxBatchIntervalS = maxBatchIntervalS;
-        this.maxBatchRows = maxBatchRows;
-        this.maxBatchSizeBytes = maxBatchSizeBytes;
-        this.execMemLimit = execMemLimit;
-        this.sendBatchParallelism = sendBatchParallelism;
-        this.timezone = timezone;
-        this.workloadGroupName = workloadGroupName;
-        this.loadToSingleTablet = loadToSingleTablet;
-        this.strictMode = strictMode;
-        this.isPartialUpdate = isPartialUpdate;
-        this.dataSourceProperties = dataSourceProperties;
-        this.fileFormatProperties = fileFormatProperties;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public String getDBName() {
-        return dbName;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public String getTypeName() {
-        return typeName;
-    }
-
-    public RoutineLoadDesc getRoutineLoadDesc() {
-        return routineLoadDesc;
-    }
-
-    public int getDesiredConcurrentNum() {
-        return desiredConcurrentNum;
-    }
-
-    public long getMaxErrorNum() {
-        return maxErrorNum;
-    }
-
-    public double getMaxFilterRatio() {
-        return maxFilterRatio;
-    }
-
-    public long getMaxBatchIntervalS() {
-        return maxBatchIntervalS;
-    }
-
-    public long getMaxBatchRows() {
-        return maxBatchRows;
-    }
-
-    public long getMaxBatchSize() {
-        return maxBatchSizeBytes;
-    }
-
-    public long getExecMemLimit() {
-        return execMemLimit;
-    }
-
-    public int getSendBatchParallelism() {
-        return sendBatchParallelism;
-    }
-
-    public boolean isLoadToSingleTablet() {
-        return loadToSingleTablet;
-    }
-
-    public boolean isStrictMode() {
-        return strictMode;
-    }
-
-    public String getTimezone() {
-        return timezone;
-    }
-
-    public LoadTask.MergeType getMergeType() {
-        return mergeType;
-    }
-
-    public FileFormatProperties getFileFormatProperties() {
-        return fileFormatProperties;
-    }
-
-    public AbstractDataSourceProperties getDataSourceProperties() {
-        return dataSourceProperties;
-    }
-
-    public String getComment() {
-        return comment;
-    }
-
-    public String getWorkloadGroupName() {
-        return this.workloadGroupName;
-    }
-
-    @Override
-    public void analyze() throws UserException {
-        super.analyze();
-        // check dbName and tableName
-        checkDBTable();
-        // check name
-        try {
-            FeNameFormat.checkCommonName(NAME_TYPE, name);
-        } catch (AnalysisException e) {
-            // 64 is the length of regular expression matching
-            // (FeNameFormat.COMMON_NAME_REGEX/UNDERSCORE_COMMON_NAME_REGEX)
-            throw new AnalysisException(e.getMessage()
-                    + " Maybe routine load job name is longer than 64 or 
contains illegal characters");
-        }
-        // check load properties include column separator etc.
-        checkLoadProperties();
-        // check routine load job properties include desired concurrent number 
etc.
-        checkJobProperties();
-        // check data source properties
-        checkDataSourceProperties();
-        // analyze merge type
-        if (routineLoadDesc != null) {
-            routineLoadDesc.analyze();
-        } else if (mergeType == LoadTask.MergeType.MERGE) {
-            throw new AnalysisException("Excepted DELETE ON clause when merge 
type is MERGE.");
-        }
-    }
-
-    public void checkDBTable() throws AnalysisException {
-        labelName.analyze();
-        dbName = labelName.getDbName();
-        name = labelName.getLabelName();
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
-        if (isPartialUpdate && isMultiTable) {
-            throw new AnalysisException("Partial update is not supported in 
multi-table load.");
-        }
-        if (isMultiTable) {
-            return;
-        }
-        if (Strings.isNullOrEmpty(tableName)) {
-            throw new AnalysisException("Table name should not be null");
-        }
-        Table table = db.getTableOrAnalysisException(tableName);
-        if (mergeType != LoadTask.MergeType.APPEND
-                && (table.getType() != Table.TableType.OLAP
-                || ((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS)) 
{
-            throw new AnalysisException("load by MERGE or DELETE is only 
supported in unique tables.");
-        }
-        if (mergeType != LoadTask.MergeType.APPEND
-                && !(table.getType() == Table.TableType.OLAP && ((OlapTable) 
table).hasDeleteSign())) {
-            throw new AnalysisException("load by MERGE or DELETE need to 
upgrade table to support batch delete.");
-        }
-        if (isPartialUpdate && !((OlapTable) 
table).getEnableUniqueKeyMergeOnWrite()) {
-            throw new AnalysisException("load by PARTIAL_COLUMNS is only 
supported in unique table MoW");
-        }
-    }
-
-    public void checkLoadProperties() throws UserException {
-        Separator columnSeparator = null;
-        // TODO(yangzhengguo01): add line delimiter to properties
-        Separator lineDelimiter = null;
-        ImportColumnsStmt importColumnsStmt = null;
-        ImportWhereStmt precedingImportWhereStmt = null;
-        ImportWhereStmt importWhereStmt = null;
-        ImportSequenceStmt importSequenceStmt = null;
-        PartitionNames partitionNames = null;
-        ImportDeleteOnStmt importDeleteOnStmt = null;
-        if (loadPropertyList != null) {
-            for (ParseNode parseNode : loadPropertyList) {
-                if (parseNode instanceof Separator) {
-                    // check column separator
-                    if (columnSeparator != null) {
-                        throw new AnalysisException("repeat setting of column 
separator");
-                    }
-                    columnSeparator = (Separator) parseNode;
-                    columnSeparator.analyze();
-                } else if (parseNode instanceof ImportColumnsStmt) {
-                    if (isMultiTable) {
-                        throw new AnalysisException("Multi-table load does not 
support setting columns info");
-                    }
-                    // check columns info
-                    if (importColumnsStmt != null) {
-                        throw new AnalysisException("repeat setting of columns 
info");
-                    }
-                    importColumnsStmt = (ImportColumnsStmt) parseNode;
-                } else if (parseNode instanceof ImportWhereStmt) {
-                    // check where expr
-                    ImportWhereStmt node = (ImportWhereStmt) parseNode;
-                    if (node.isPreceding()) {
-                        if (isMultiTable) {
-                            throw new AnalysisException("Multi-table load does 
not support setting columns info");
-                        }
-                        if (precedingImportWhereStmt != null) {
-                            throw new AnalysisException("repeat setting of 
preceding where predicate");
-                        }
-                        precedingImportWhereStmt = node;
-                    } else {
-                        if (importWhereStmt != null) {
-                            throw new AnalysisException("repeat setting of 
where predicate");
-                        }
-                        importWhereStmt = node;
-                    }
-                } else if (parseNode instanceof PartitionNames) {
-                    // check partition names
-                    if (partitionNames != null) {
-                        throw new AnalysisException("repeat setting of 
partition names");
-                    }
-                    partitionNames = (PartitionNames) parseNode;
-                    partitionNames.analyze();
-                } else if (parseNode instanceof ImportDeleteOnStmt) {
-                    // check delete expr
-                    if (importDeleteOnStmt != null) {
-                        throw new AnalysisException("repeat setting of delete 
predicate");
-                    }
-                    importDeleteOnStmt = (ImportDeleteOnStmt) parseNode;
-                } else if (parseNode instanceof ImportSequenceStmt) {
-                    // check sequence column
-                    if (importSequenceStmt != null) {
-                        throw new AnalysisException("repeat setting of 
sequence column");
-                    }
-                    importSequenceStmt = (ImportSequenceStmt) parseNode;
-                }
-            }
-        }
-        routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, 
importColumnsStmt,
-                precedingImportWhereStmt, importWhereStmt,
-                partitionNames, importDeleteOnStmt == null ? null : 
importDeleteOnStmt.getExpr(), mergeType,
-                importSequenceStmt == null ? null : 
importSequenceStmt.getSequenceColName());
-    }
-
-    private void checkJobProperties() throws UserException {
-        Optional<String> optional = jobProperties.keySet().stream().filter(
-                entity -> !PROPERTIES_SET.contains(entity)).findFirst();
-        if (optional.isPresent()) {
-            throw new AnalysisException(optional.get() + " is invalid 
property");
-        }
-
-        desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(
-                jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY),
-                Config.max_routine_load_task_concurrent_num, 
DESIRED_CONCURRENT_NUMBER_PRED,
-                DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater than 
0")).intValue();
-
-        maxErrorNum = 
Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY),
-                RoutineLoadJob.DEFAULT_MAX_ERROR_NUM, MAX_ERROR_NUMBER_PRED,
-                MAX_ERROR_NUMBER_PROPERTY + " should >= 0");
-
-        maxFilterRatio = 
Util.getDoublePropertyOrDefault(jobProperties.get(MAX_FILTER_RATIO_PROPERTY),
-                RoutineLoadJob.DEFAULT_MAX_FILTER_RATIO, MAX_FILTER_RATIO_PRED,
-                MAX_FILTER_RATIO_PROPERTY + " should between 0 and 1");
-
-        maxBatchIntervalS = 
Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_INTERVAL_SEC_PROPERTY),
-                RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, 
MAX_BATCH_INTERVAL_PRED,
-                MAX_BATCH_INTERVAL_SEC_PROPERTY + " should >= 1");
-
-        maxBatchRows = 
Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_ROWS_PROPERTY),
-                RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, MAX_BATCH_ROWS_PRED,
-                MAX_BATCH_ROWS_PROPERTY + " should > 200000");
-
-        maxBatchSizeBytes = 
Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY),
-                RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED,
-                MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 10GB");
-
-        strictMode = 
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE),
-                RoutineLoadJob.DEFAULT_STRICT_MODE,
-                LoadStmt.STRICT_MODE + " should be a boolean");
-        execMemLimit = 
Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY),
-                RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED,
-                EXEC_MEM_LIMIT_PROPERTY + " must be greater than 0");
-
-        sendBatchParallelism = ((Long) 
Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM),
-                
ConnectContext.get().getSessionVariable().getSendBatchParallelism(), 
SEND_BATCH_PARALLELISM_PRED,
-                SEND_BATCH_PARALLELISM + " must be greater than 
0")).intValue();
-        loadToSingleTablet = 
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
-                RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
-                LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
-
-        String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
-        if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
-            ConnectContext tmpCtx = new ConnectContext();
-            if (Config.isCloudMode()) {
-                tmpCtx.setCloudCluster(ConnectContext.get().getCloudCluster());
-            }
-            
tmpCtx.setCurrentUserIdentity(ConnectContext.get().getCurrentUserIdentity());
-            
tmpCtx.getSessionVariable().setWorkloadGroup(inputWorkloadGroupStr);
-            List<WorkloadGroup> wgList = 
Env.getCurrentEnv().getWorkloadGroupMgr()
-                    .getWorkloadGroup(tmpCtx);
-            if (wgList.size() == 0) {
-                throw new UserException("Can not find workload group " + 
inputWorkloadGroupStr);
-            }
-            workloadGroupName = inputWorkloadGroupStr;
-        }
-
-        if (ConnectContext.get() != null) {
-            timezone = ConnectContext.get().getSessionVariable().getTimeZone();
-        }
-        timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE,
 timezone));
-
-        fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
-    }
-
-    private void checkDataSourceProperties() throws UserException {
-        this.dataSourceProperties.setTimezone(this.timezone);
-        this.dataSourceProperties.analyze();
-    }
-
-    @Override
-    public StmtType stmtType() {
-        return StmtType.CREATE;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index ee15c69925b..5da8f90a0c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
@@ -524,33 +523,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return kafkaRoutineLoadJob;
     }
 
-    public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt 
stmt) throws UserException {
-        // check db and table
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDBName());
-
-        long id = Env.getCurrentEnv().getNextId();
-        KafkaDataSourceProperties kafkaProperties = 
(KafkaDataSourceProperties) stmt.getDataSourceProperties();
-        KafkaRoutineLoadJob kafkaRoutineLoadJob;
-        if (kafkaProperties.isMultiTable()) {
-            kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
-                    db.getId(),
-                    kafkaProperties.getBrokerList(), 
kafkaProperties.getTopic(), stmt.getUserInfo(), true);
-        } else {
-            OlapTable olapTable = 
db.getOlapTableOrDdlException(stmt.getTableName());
-            checkMeta(olapTable, stmt.getRoutineLoadDesc());
-            long tableId = olapTable.getId();
-            // init kafka routine load job
-            kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
-                    db.getId(), tableId,
-                    kafkaProperties.getBrokerList(), 
kafkaProperties.getTopic(), stmt.getUserInfo());
-        }
-        kafkaRoutineLoadJob.setOptional(stmt);
-        kafkaRoutineLoadJob.checkCustomProperties();
-        kafkaRoutineLoadJob.checkCustomPartition();
-
-        return kafkaRoutineLoadJob;
-    }
-
     private void checkCustomPartition() throws UserException {
         if (customKafkaPartitions.isEmpty()) {
             return;
@@ -646,21 +618,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" + 
UUID.randomUUID());
     }
 
-    @Override
-    protected void setOptional(CreateRoutineLoadStmt stmt) throws 
UserException {
-        super.setOptional(stmt);
-        KafkaDataSourceProperties kafkaDataSourceProperties
-                = (KafkaDataSourceProperties) stmt.getDataSourceProperties();
-        if 
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
 {
-            setCustomKafkaPartitions(kafkaDataSourceProperties);
-        }
-        if 
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
-            
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
-        }
-        // set group id if not specified
-        this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" + 
UUID.randomUUID());
-    }
-
     // this is an unprotected method which is called in the initialization 
function
     private void setCustomKafkaPartitions(KafkaDataSourceProperties 
kafkaDataSourceProperties) throws LoadException {
 
@@ -822,8 +779,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             Map<String, String> copiedJobProperties = 
Maps.newHashMap(jobProperties);
             modifyCommonJobProperties(copiedJobProperties);
             this.jobProperties.putAll(copiedJobProperties);
-            if 
(jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
-                this.isPartialUpdate = 
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS));
+            if 
(jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
+                this.isPartialUpdate = 
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
             }
         }
         LOG.info("modify the properties of kafka routine load job: {}, 
jobProperties: {}, datasource properties: {}",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index e74e8ff3e43..34cd9e5228f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnsStmt;
 import org.apache.doris.analysis.LoadStmt;
@@ -420,73 +419,6 @@ public abstract class RoutineLoadJob
         }
     }
 
-    protected void setOptional(CreateRoutineLoadStmt stmt) throws 
UserException {
-        setRoutineLoadDesc(stmt.getRoutineLoadDesc());
-        if (stmt.getDesiredConcurrentNum() != -1) {
-            this.desireTaskConcurrentNum = stmt.getDesiredConcurrentNum();
-        }
-        if (stmt.getMaxErrorNum() != -1) {
-            this.maxErrorNum = stmt.getMaxErrorNum();
-        }
-        if (stmt.getMaxFilterRatio() != -1) {
-            this.maxFilterRatio = stmt.getMaxFilterRatio();
-        }
-        if (stmt.getMaxBatchIntervalS() != -1) {
-            this.maxBatchIntervalS = stmt.getMaxBatchIntervalS();
-        }
-        if (stmt.getMaxBatchRows() != -1) {
-            this.maxBatchRows = stmt.getMaxBatchRows();
-        }
-        if (stmt.getMaxBatchSize() != -1) {
-            this.maxBatchSizeBytes = stmt.getMaxBatchSize();
-        }
-        if (stmt.getExecMemLimit() != -1) {
-            this.execMemLimit = stmt.getExecMemLimit();
-        }
-        if (stmt.getSendBatchParallelism() > 0) {
-            this.sendBatchParallelism = stmt.getSendBatchParallelism();
-        }
-        if (stmt.isLoadToSingleTablet()) {
-            this.loadToSingleTablet = stmt.isLoadToSingleTablet();
-        }
-        jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone());
-        jobProperties.put(LoadStmt.STRICT_MODE, 
String.valueOf(stmt.isStrictMode()));
-        jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 
String.valueOf(this.sendBatchParallelism));
-        jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, 
String.valueOf(this.loadToSingleTablet));
-        jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
stmt.isPartialUpdate() ? "true" : "false");
-        if (stmt.isPartialUpdate()) {
-            this.isPartialUpdate = true;
-        }
-        jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
-
-        FileFormatProperties fileFormatProperties = 
stmt.getFileFormatProperties();
-        if (fileFormatProperties instanceof CsvFileFormatProperties) {
-            CsvFileFormatProperties csvFileFormatProperties = 
(CsvFileFormatProperties) fileFormatProperties;
-            jobProperties.put(FileFormatProperties.PROP_FORMAT, "csv");
-            jobProperties.put(LoadStmt.KEY_ENCLOSE, new String(new 
byte[]{csvFileFormatProperties.getEnclose()}));
-            jobProperties.put(LoadStmt.KEY_ESCAPE, new String(new 
byte[]{csvFileFormatProperties.getEscape()}));
-            this.enclose = csvFileFormatProperties.getEnclose();
-            this.escape = csvFileFormatProperties.getEscape();
-        } else if (fileFormatProperties instanceof JsonFileFormatProperties) {
-            JsonFileFormatProperties jsonFileFormatProperties = 
(JsonFileFormatProperties) fileFormatProperties;
-            jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
-            jobProperties.put(JsonFileFormatProperties.PROP_JSON_PATHS, 
jsonFileFormatProperties.getJsonPaths());
-            jobProperties.put(JsonFileFormatProperties.PROP_JSON_ROOT, 
jsonFileFormatProperties.getJsonRoot());
-            jobProperties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
-                    
String.valueOf(jsonFileFormatProperties.isStripOuterArray()));
-            jobProperties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING,
-                    String.valueOf(jsonFileFormatProperties.isNumAsString()));
-            jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE,
-                    String.valueOf(jsonFileFormatProperties.isFuzzyParse()));
-        } else {
-            throw new UserException("Invalid format type.");
-        }
-
-        if (!StringUtils.isEmpty(stmt.getWorkloadGroupName())) {
-            jobProperties.put(WORKLOAD_GROUP, stmt.getWorkloadGroupName());
-        }
-    }
-
     protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
         if (routineLoadDesc != null) {
             if (routineLoadDesc.getColumnsInfo() != null) {
@@ -1820,15 +1752,15 @@ public abstract class RoutineLoadJob
         }
         // 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt
         sb.append("PROPERTIES\n(\n");
-        appendProperties(sb, 
CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, 
desireTaskConcurrentNum, false);
-        appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, 
maxErrorNum, false);
-        appendProperties(sb, CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, 
maxFilterRatio, false);
-        appendProperties(sb, 
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, 
false);
-        appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, 
maxBatchRows, false);
-        appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, 
maxBatchSizeBytes, false);
+        appendProperties(sb, 
CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, 
desireTaskConcurrentNum, false);
+        appendProperties(sb, CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, 
maxErrorNum, false);
+        appendProperties(sb, CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY, 
maxFilterRatio, false);
+        appendProperties(sb, 
CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, 
false);
+        appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, 
maxBatchRows, false);
+        appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY, 
maxBatchSizeBytes, false);
         appendProperties(sb, FileFormatProperties.PROP_FORMAT, getFormat(), 
false);
         if (isPartialUpdate) {
-            appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
isPartialUpdate, false);
+            appendProperties(sb, CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
isPartialUpdate, false);
         }
         appendProperties(sb, JsonFileFormatProperties.PROP_JSON_PATHS, 
getJsonPaths(), false);
         appendProperties(sb, JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, 
isStripOuterArray(), false);
@@ -1923,14 +1855,14 @@ public abstract class RoutineLoadJob
                 sequenceCol == null ? STAR_STRING : sequenceCol);
 
         // job properties defined in CreateRoutineLoadStmt
-        jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
-        jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, 
String.valueOf(maxErrorNum));
-        
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, 
String.valueOf(maxBatchIntervalS));
-        jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, 
String.valueOf(maxBatchRows));
-        jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, 
String.valueOf(maxBatchSizeBytes));
-        
jobProperties.put(CreateRoutineLoadStmt.CURRENT_CONCURRENT_NUMBER_PROPERTY,
+        jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
+        jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, 
String.valueOf(maxErrorNum));
+        
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, 
String.valueOf(maxBatchIntervalS));
+        jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, 
String.valueOf(maxBatchRows));
+        jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY, 
String.valueOf(maxBatchSizeBytes));
+        
jobProperties.put(CreateRoutineLoadInfo.CURRENT_CONCURRENT_NUMBER_PROPERTY,
                 String.valueOf(currentTaskConcurrentNum));
-        
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
+        
jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY,
                 String.valueOf(desireTaskConcurrentNum));
         jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 
String.valueOf(execMemLimit));
         jobProperties.put(LoadStmt.KEY_IN_PARAM_MERGE_TYPE, 
mergeType.toString());
@@ -1974,7 +1906,7 @@ public abstract class RoutineLoadJob
             isMultiTable = true;
         }
         jobProperties.forEach((k, v) -> {
-            if (k.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+            if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
                 isPartialUpdate = Boolean.parseBoolean(v);
             }
         });
@@ -2015,35 +1947,35 @@ public abstract class RoutineLoadJob
 
     // for ALTER ROUTINE LOAD
     protected void modifyCommonJobProperties(Map<String, String> 
jobProperties) {
-        if 
(jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY))
 {
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY))
 {
             this.desireTaskConcurrentNum = Integer.parseInt(
-                    
jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
+                    
jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
         }
 
-        if 
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) {
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY)) {
             this.maxErrorNum = Long.parseLong(
-                    
jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
+                    
jobProperties.remove(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY));
         }
 
-        if 
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY)) {
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY)) {
             this.maxFilterRatio = Double.parseDouble(
-                    
jobProperties.remove(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY));
-            
this.jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
+                    
jobProperties.remove(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY));
+            
this.jobProperties.put(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY, 
String.valueOf(maxFilterRatio));
         }
 
-        if 
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY))
 {
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY))
 {
             this.maxBatchIntervalS = Long.parseLong(
-                    
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
+                    
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY));
         }
 
-        if 
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) {
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)) {
             this.maxBatchRows = Long.parseLong(
-                    
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
+                    
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY));
         }
 
-        if 
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) {
+        if 
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)) {
             this.maxBatchSizeBytes = Long.parseLong(
-                    
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY));
+                    
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index a22eae6b009..7d5486f5565 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
@@ -203,38 +202,6 @@ public class RoutineLoadManager implements Writable {
                 info.getTableName());
     }
 
-    // cloud override
-    public void createRoutineLoadJob(CreateRoutineLoadStmt 
createRoutineLoadStmt)
-            throws UserException {
-        // check load auth
-        if 
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
-                InternalCatalog.INTERNAL_CATALOG_NAME,
-                createRoutineLoadStmt.getDBName(),
-                createRoutineLoadStmt.getTableName(),
-                PrivPredicate.LOAD)) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
-                    ConnectContext.get().getQualifiedUser(),
-                    ConnectContext.get().getRemoteIP(),
-                    createRoutineLoadStmt.getDBName(),
-                    createRoutineLoadStmt.getDBName() + ": " + 
createRoutineLoadStmt.getTableName());
-        }
-
-        RoutineLoadJob routineLoadJob = null;
-        LoadDataSourceType type = 
LoadDataSourceType.valueOf(createRoutineLoadStmt.getTypeName());
-        switch (type) {
-            case KAFKA:
-                routineLoadJob = 
KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
-                break;
-            default:
-                throw new UserException("Unknown data source type: " + type);
-        }
-
-        routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
-        routineLoadJob.setComment(createRoutineLoadStmt.getComment());
-        addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName(),
-                    createRoutineLoadStmt.getTableName());
-    }
-
     public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String 
dbName, String tableName)
                     throws UserException {
         writeLock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 2e13c3e7e6b..dacc706ea35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -444,7 +444,10 @@ public class CreateRoutineLoadInfo {
             importSequenceStmt == null ? null : 
importSequenceStmt.getSequenceColName());
     }
 
-    private void checkJobProperties() throws UserException {
+    /**
+     * checkJobProperties
+     */
+    public void checkJobProperties() throws UserException {
         Optional<String> optional = jobProperties.keySet().stream().filter(
                 entity -> !PROPERTIES_SET.contains(entity)).findFirst();
         if (optional.isPresent()) {
@@ -506,7 +509,7 @@ public class CreateRoutineLoadInfo {
             this.workloadGroupName = inputWorkloadGroupStr;
         }
 
-        if (ConnectContext.get() != null) {
+        if (ConnectContext.get().getSessionVariable().getTimeZone() != null) {
             timezone = ConnectContext.get().getSessionVariable().getTimeZone();
         }
         timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, 
timezone));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 3540fc6bb29..0b87a3550bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -17,9 +17,7 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DdlStmt;
 import org.apache.doris.catalog.Env;
@@ -44,10 +42,6 @@ public class DdlExecutor {
             env.createTable((CreateTableStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateMaterializedViewStmt) {
             env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt);
-        } else if (ddlStmt instanceof AlterTableStmt) {
-            env.alterTable((AlterTableStmt) ddlStmt);
-        } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
-            
env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) 
ddlStmt);
         } else {
             LOG.warn("Unkown statement " + ddlStmt.getClass());
             throw new DdlException("Unknown statement.");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 63452a5d59c..1d8b58257dc 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.ImportSequenceStmt;
 import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.UserIdentity;
@@ -42,6 +40,10 @@ import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
 import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
 import org.apache.doris.mysql.privilege.MockedAuth;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TResourceInfo;
@@ -66,6 +68,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -76,6 +79,7 @@ public class KafkaRoutineLoadJobTest {
     private String jobName = "job1";
     private String dbName = "db1";
     private LabelName labelName = new LabelName(dbName, jobName);
+    private LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName);
     private String tableNameString = "table1";
     private String topicName = "topic1";
     private String serverAddress = "http://127.0.0.1:8080";;
@@ -246,10 +250,11 @@ public class KafkaRoutineLoadJobTest {
     public void testFromCreateStmt(@Mocked Env env,
                                    @Injectable Database database,
             @Injectable OlapTable table) throws UserException {
-        CreateRoutineLoadStmt createRoutineLoadStmt = 
initCreateRoutineLoadStmt();
+        CreateRoutineLoadInfo createRoutineLoadInfo = 
initCreateRoutineLoadInfo();
+        createRoutineLoadInfo.validate(connectContext);
         RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, 
null, null, null, null, partitionNames, null,
                 LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName());
-        Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", 
routineLoadDesc);
+        Deencapsulation.setField(createRoutineLoadInfo, "routineLoadDesc", 
routineLoadDesc);
         List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList();
         List<PartitionInfo> kafkaPartitionInfoList = Lists.newArrayList();
         for (String s : kafkaPartitionString.split(",")) {
@@ -261,7 +266,7 @@ public class KafkaRoutineLoadJobTest {
         dsProperties.setKafkaPartitionOffsets(partitionIdToOffset);
         Deencapsulation.setField(dsProperties, "brokerList", serverAddress);
         Deencapsulation.setField(dsProperties, "topic", topicName);
-        Deencapsulation.setField(createRoutineLoadStmt, 
"dataSourceProperties", dsProperties);
+        Deencapsulation.setField(createRoutineLoadInfo, 
"dataSourceProperties", dsProperties);
 
         long dbId = 1L;
         long tableId = 2L;
@@ -305,7 +310,7 @@ public class KafkaRoutineLoadJobTest {
             }
         };
 
-        KafkaRoutineLoadJob kafkaRoutineLoadJob = 
KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
+        KafkaRoutineLoadJob kafkaRoutineLoadJob = 
KafkaRoutineLoadJob.fromCreateInfo(createRoutineLoadInfo, connectContext);
         Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName());
         Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId());
         Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId());
@@ -316,12 +321,9 @@ public class KafkaRoutineLoadJobTest {
         Assert.assertEquals(sequenceStmt.getSequenceColName(), 
kafkaRoutineLoadJob.getSequenceCol());
     }
 
-    private CreateRoutineLoadStmt initCreateRoutineLoadStmt() {
-        List<ParseNode> loadPropertyList = new ArrayList<>();
-        loadPropertyList.add(columnSeparator);
-        loadPropertyList.add(partitionNames);
+    private CreateRoutineLoadInfo initCreateRoutineLoadInfo() {
         Map<String, String> properties = Maps.newHashMap();
-        
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
+        
properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
         String typeName = LoadDataSourceType.KAFKA.name();
         Map<String, String> customProperties = Maps.newHashMap();
 
@@ -329,11 +331,11 @@ public class KafkaRoutineLoadJobTest {
         customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), 
serverAddress);
         customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), 
kafkaPartitionString);
 
-        CreateRoutineLoadStmt createRoutineLoadStmt = new 
CreateRoutineLoadStmt(labelName, tableNameString,
-                                                                               
 loadPropertyList, properties,
-                                                                               
 typeName, customProperties,
-                                                                               
 LoadTask.MergeType.APPEND, "");
-        Deencapsulation.setField(createRoutineLoadStmt, "name", jobName);
-        return createRoutineLoadStmt;
+        LoadSeparator loadSeparator = new LoadSeparator(",");
+        Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
+        loadPropertyMap.put(loadSeparator.getClass().getName(), loadSeparator);
+        CreateRoutineLoadInfo createRoutineLoadInfo = new 
CreateRoutineLoadInfo(labelNameInfo, tableNameString,
+                loadPropertyMap, properties, typeName, customProperties, 
LoadTask.MergeType.APPEND, "");
+        return createRoutineLoadInfo;
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index dd38d2fbbe7..e86d10d0c8c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
-import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.UserIdentity;
@@ -40,13 +38,16 @@ import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
 import 
org.apache.doris.nereids.trees.plans.commands.load.PauseRoutineLoadCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.load.ResumeRoutineLoadCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.load.StopRoutineLoadCommand;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TResourceInfo;
@@ -55,7 +56,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import mockit.Expectations;
 import mockit.Injectable;
-import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
 import org.apache.logging.log4j.LogManager;
@@ -65,6 +65,7 @@ import org.junit.Test;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -76,75 +77,6 @@ public class RoutineLoadManagerTest {
     @Mocked
     private SystemInfoService systemInfoService;
 
-    @Test
-    public void testAddJobByStmt(@Injectable AccessControllerManager 
accessManager,
-            @Injectable TResourceInfo tResourceInfo,
-            @Mocked ConnectContext connectContext,
-            @Mocked Env env) throws UserException {
-        String jobName = "job1";
-        String dbName = "db1";
-        LabelName labelName = new LabelName(dbName, jobName);
-        String tableNameString = "table1";
-        List<ParseNode> loadPropertyList = new ArrayList<>();
-        Separator columnSeparator = new Separator(",");
-        loadPropertyList.add(columnSeparator);
-        Map<String, String> properties = Maps.newHashMap();
-        
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
-        String typeName = LoadDataSourceType.KAFKA.name();
-        Map<String, String> customProperties = Maps.newHashMap();
-        String topicName = "topic1";
-        customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), 
topicName);
-        String serverAddress = "http://127.0.0.1:8080";;
-        customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), 
serverAddress);
-        CreateRoutineLoadStmt createRoutineLoadStmt = new 
CreateRoutineLoadStmt(labelName, tableNameString,
-                                                                               
 loadPropertyList, properties,
-                                                                               
 typeName, customProperties,
-                                                                               
 LoadTask.MergeType.APPEND, "");
-        createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
-
-        KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, 
jobName, 1L, 1L,
-                serverAddress, topicName, UserIdentity.ADMIN);
-
-        new MockUp<KafkaRoutineLoadJob>() {
-            @Mock
-            public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt 
stmt) {
-                return kafkaRoutineLoadJob;
-            }
-        };
-
-        new Expectations() {
-            {
-                env.getAccessManager();
-                minTimes = 0;
-                result = accessManager;
-                accessManager.checkTblPriv((ConnectContext) any, anyString, 
anyString, anyString, PrivPredicate.LOAD);
-                minTimes = 0;
-                result = true;
-            }
-        };
-        RoutineLoadManager routineLoadManager = new RoutineLoadManager();
-        routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
-
-        Map<String, RoutineLoadJob> idToRoutineLoadJob =
-                Deencapsulation.getField(routineLoadManager, 
"idToRoutineLoadJob");
-        Assert.assertEquals(1, idToRoutineLoadJob.size());
-        RoutineLoadJob routineLoadJob = 
idToRoutineLoadJob.values().iterator().next();
-        Assert.assertEquals(1L, routineLoadJob.getDbId());
-        Assert.assertEquals(jobName, routineLoadJob.getName());
-        Assert.assertEquals(1L, routineLoadJob.getTableId());
-        Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, 
routineLoadJob.getState());
-        Assert.assertEquals(true, routineLoadJob instanceof 
KafkaRoutineLoadJob);
-
-        Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob =
-                Deencapsulation.getField(routineLoadManager, 
"dbToNameToRoutineLoadJob");
-        Assert.assertEquals(1, dbToNameToRoutineLoadJob.size());
-        Assert.assertEquals(Long.valueOf(1L), 
dbToNameToRoutineLoadJob.keySet().iterator().next());
-        Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = 
dbToNameToRoutineLoadJob.get(1L);
-        Assert.assertEquals(jobName, 
nameToRoutineLoadJob.keySet().iterator().next());
-        Assert.assertEquals(1, nameToRoutineLoadJob.values().size());
-        Assert.assertEquals(routineLoadJob, 
nameToRoutineLoadJob.values().iterator().next().get(0));
-    }
-
     @Test
     public void testCreateJobAuthDeny(@Injectable AccessControllerManager 
accessManager,
             @Injectable TResourceInfo tResourceInfo,
@@ -152,25 +84,24 @@ public class RoutineLoadManagerTest {
             @Mocked Env env) {
         String jobName = "job1";
         String dbName = "db1";
-        LabelName labelName = new LabelName(dbName, jobName);
+        LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName);
         String tableNameString = "table1";
         List<ParseNode> loadPropertyList = new ArrayList<>();
         Separator columnSeparator = new Separator(",");
         loadPropertyList.add(columnSeparator);
         Map<String, String> properties = Maps.newHashMap();
-        
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
+        
properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
         String typeName = LoadDataSourceType.KAFKA.name();
         Map<String, String> customProperties = Maps.newHashMap();
         String topicName = "topic1";
         customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), 
topicName);
         String serverAddress = "http://127.0.0.1:8080";;
         customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), 
serverAddress);
-        CreateRoutineLoadStmt createRoutineLoadStmt = new 
CreateRoutineLoadStmt(labelName, tableNameString,
-                                                                               
 loadPropertyList, properties,
-                                                                               
 typeName, customProperties,
-                                                                               
 LoadTask.MergeType.APPEND, "");
-        createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
-
+        LoadSeparator loadSeparator = new LoadSeparator(",");
+        Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
+        loadPropertyMap.put(loadSeparator.getClass().getName(), loadSeparator);
+        CreateRoutineLoadInfo createRoutineLoadInfo = new 
CreateRoutineLoadInfo(labelNameInfo, tableNameString,
+                loadPropertyMap, properties, typeName, customProperties, 
LoadTask.MergeType.APPEND, "");
 
         new Expectations() {
             {
@@ -184,7 +115,8 @@ public class RoutineLoadManagerTest {
         };
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
         try {
-            routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
+            createRoutineLoadInfo.checkJobProperties();
+            routineLoadManager.createRoutineLoadJob(createRoutineLoadInfo, 
connectContext);
             Assert.fail();
         } catch (LoadException | DdlException e) {
             Assert.fail();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
index a93b4b3a3d2..8a1550d48f5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.persist;
 
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
 import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
 
 import com.google.common.collect.Maps;
 import org.junit.Assert;
@@ -48,7 +48,7 @@ public class AlterRoutineLoadOperationLogTest {
 
         long jobId = 1000;
         Map<String, String> jobProperties = Maps.newHashMap();
-        
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, 
"5");
+        
jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, 
"5");
 
         Map<String, String> dataSourceProperties = Maps.newHashMap();
         
dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "0, 1");
@@ -71,7 +71,7 @@ public class AlterRoutineLoadOperationLogTest {
 
         AlterRoutineLoadJobOperationLog log2 = 
AlterRoutineLoadJobOperationLog.read(in);
         Assert.assertEquals(1, log2.getJobProperties().size());
-        Assert.assertEquals("5", 
log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
+        Assert.assertEquals("5", 
log2.getJobProperties().get(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
         KafkaDataSourceProperties kafkaDataSourceProperties = 
(KafkaDataSourceProperties) log2.getDataSourceProperties();
         Assert.assertEquals(null, kafkaDataSourceProperties.getBrokerList());
         Assert.assertEquals(null, kafkaDataSourceProperties.getTopic());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to