This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e217401bf24 [Chore](nereids) Remove createRoutineLoadStmt (#55144) e217401bf24 is described below commit e217401bf244ea2614c6c04deb604e3cedaa1ca5 Author: yaoxiao <yx136264...@163.com> AuthorDate: Tue Aug 26 15:27:40 2025 +0800 [Chore](nereids) Remove createRoutineLoadStmt (#55144) --- .../bloom_parquet_table/bloom_parquet_table | Bin 58801541 -> 0 bytes .../doris/analysis/CreateRoutineLoadStmt.java | 554 --------------------- .../load/routineload/KafkaRoutineLoadJob.java | 47 +- .../doris/load/routineload/RoutineLoadJob.java | 124 ++--- .../doris/load/routineload/RoutineLoadManager.java | 33 -- .../plans/commands/info/CreateRoutineLoadInfo.java | 7 +- .../main/java/org/apache/doris/qe/DdlExecutor.java | 6 - .../load/routineload/KafkaRoutineLoadJobTest.java | 36 +- .../load/routineload/RoutineLoadManagerTest.java | 96 +--- .../persist/AlterRoutineLoadOperationLogTest.java | 6 +- 10 files changed, 71 insertions(+), 838 deletions(-) diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table deleted file mode 100644 index a861d2dfbe3..00000000000 Binary files a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table and /dev/null differ diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java deleted file mode 100644 index ba647e74a37..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ /dev/null @@ -1,554 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.KeysType; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; -import org.apache.doris.common.FeNameFormat; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.property.fileformat.FileFormatProperties; -import org.apache.doris.load.RoutineLoadDesc; -import org.apache.doris.load.loadv2.LoadTask; -import org.apache.doris.load.routineload.AbstractDataSourceProperties; -import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; -import org.apache.doris.load.routineload.RoutineLoadJob; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.OriginStatement; -import org.apache.doris.resource.workloadgroup.WorkloadGroup; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Predicate; - -/* - Create routine Load statement, continually load data from a streaming app - - syntax: - CREATE ROUTINE LOAD [database.]name on table - [load properties] - [PROPERTIES - ( - desired_concurrent_number = xxx, - max_error_number = xxx, - k1 = v1, - ... - kn = vn - )] - FROM type of routine load - [( - k1 = v1, - ... - kn = vn - )] - - load properties: - load property [[,] load property] ... - - load property: - column separator | columns_mapping | partitions | where - - column separator: - COLUMNS TERMINATED BY xxx - columns_mapping: - COLUMNS (c1, c2, c3 = c1 + c2) - partitions: - PARTITIONS (p1, p2, p3) - where: - WHERE c1 > 1 - - type of routine load: - KAFKA -*/ -public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParser { - private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmt.class); - - // routine load properties - public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number"; - public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY = "current_concurrent_number"; - // max error number in ten thousand records - public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; - public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; - // the following 3 properties limit the time and batch size of a single routine load task - public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval"; - public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows"; - public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size"; - public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit"; - - public static final String FORMAT = "format"; // the value is csv or json, default is csv - public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; - public static final String JSONPATHS = "jsonpaths"; - public static final String JSONROOT = "json_root"; - public static final String NUM_AS_STRING = "num_as_string"; - public static final String FUZZY_PARSE = "fuzzy_parse"; - - public static final String PARTIAL_COLUMNS = "partial_columns"; - - public static final String WORKLOAD_GROUP = "workload_group"; - - private static final String NAME_TYPE = "ROUTINE LOAD NAME"; - public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; - public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; - public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; - - private AbstractDataSourceProperties dataSourceProperties; - - - private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() - .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) - .add(MAX_ERROR_NUMBER_PROPERTY) - .add(MAX_FILTER_RATIO_PROPERTY) - .add(MAX_BATCH_INTERVAL_SEC_PROPERTY) - .add(MAX_BATCH_ROWS_PROPERTY) - .add(MAX_BATCH_SIZE_PROPERTY) - .add(FORMAT) - .add(JSONPATHS) - .add(STRIP_OUTER_ARRAY) - .add(NUM_AS_STRING) - .add(FUZZY_PARSE) - .add(JSONROOT) - .add(LoadStmt.STRICT_MODE) - .add(LoadStmt.TIMEZONE) - .add(EXEC_MEM_LIMIT_PROPERTY) - .add(SEND_BATCH_PARALLELISM) - .add(LOAD_TO_SINGLE_TABLET) - .add(PARTIAL_COLUMNS) - .add(WORKLOAD_GROUP) - .add(LoadStmt.KEY_ENCLOSE) - .add(LoadStmt.KEY_ESCAPE) - .build(); - - private final LabelName labelName; - private String tableName; - private final List<ParseNode> loadPropertyList; - private final Map<String, String> jobProperties; - private final String typeName; - - // the following variables will be initialized after analyze - // -1 as unset, the default value will set in RoutineLoadJob - private String name; - private String dbName; - private RoutineLoadDesc routineLoadDesc; - private int desiredConcurrentNum = 1; - private long maxErrorNum = -1; - private double maxFilterRatio = -1; - private long maxBatchIntervalS = -1; - private long maxBatchRows = -1; - private long maxBatchSizeBytes = -1; - private boolean strictMode = true; - private long execMemLimit = 2 * 1024 * 1024 * 1024L; - private String timezone = TimeUtils.DEFAULT_TIME_ZONE; - private int sendBatchParallelism = 1; - private boolean loadToSingleTablet = false; - - private FileFormatProperties fileFormatProperties; - - private String workloadGroupName = ""; - - /** - * support partial columns load(Only Unique Key Columns) - */ - @Getter - private boolean isPartialUpdate = false; - - private String comment = ""; - - private LoadTask.MergeType mergeType; - - private boolean isMultiTable = false; - - public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L; - public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L; - public static final Predicate<Double> MAX_FILTER_RATIO_PRED = (v) -> v >= 0 && v <= 1; - public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >= 1; - public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >= 200000; - public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100 * 1024 * 1024 - && v <= (long) (1024 * 1024 * 1024) * 10; - public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L; - public static final Predicate<Long> SEND_BATCH_PARALLELISM_PRED = (v) -> v > 0L; - - public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNode> loadPropertyList, - Map<String, String> jobProperties, String typeName, - Map<String, String> dataSourceProperties, LoadTask.MergeType mergeType, - String comment) { - this.labelName = labelName; - if (StringUtils.isBlank(tableName)) { - this.isMultiTable = true; - } - this.tableName = tableName; - this.loadPropertyList = loadPropertyList; - this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; - this.typeName = typeName.toUpperCase(); - this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory - .createDataSource(typeName, dataSourceProperties, this.isMultiTable); - this.mergeType = mergeType; - this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); - if (comment != null) { - this.comment = comment; - } - String format = jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv"); - fileFormatProperties = FileFormatProperties.createFileFormatProperties(format); - } - - /* - * make stmt by nereids - */ - public CreateRoutineLoadStmt(LabelName labelName, String dbName, String name, String tableName, - List<ParseNode> loadPropertyList, OriginStatement origStmt, UserIdentity userIdentity, - Map<String, String> jobProperties, String typeName, RoutineLoadDesc routineLoadDesc, - int desireTaskConcurrentNum, long maxErrorNum, double maxFilterRatio, long maxBatchIntervalS, - long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int sendBatchParallelism, String timezone, - String workloadGroupName, boolean loadToSingleTablet, boolean strictMode, - boolean isPartialUpdate, AbstractDataSourceProperties dataSourceProperties, - FileFormatProperties fileFormatProperties) { - this.labelName = labelName; - this.dbName = dbName; - this.name = name; - this.tableName = tableName; - this.loadPropertyList = loadPropertyList; - this.setOrigStmt(origStmt); - this.setUserInfo(userIdentity); - this.jobProperties = jobProperties; - this.typeName = typeName; - this.routineLoadDesc = routineLoadDesc; - this.desiredConcurrentNum = desireTaskConcurrentNum; - this.maxErrorNum = maxErrorNum; - this.maxFilterRatio = maxFilterRatio; - this.maxBatchIntervalS = maxBatchIntervalS; - this.maxBatchRows = maxBatchRows; - this.maxBatchSizeBytes = maxBatchSizeBytes; - this.execMemLimit = execMemLimit; - this.sendBatchParallelism = sendBatchParallelism; - this.timezone = timezone; - this.workloadGroupName = workloadGroupName; - this.loadToSingleTablet = loadToSingleTablet; - this.strictMode = strictMode; - this.isPartialUpdate = isPartialUpdate; - this.dataSourceProperties = dataSourceProperties; - this.fileFormatProperties = fileFormatProperties; - } - - public String getName() { - return name; - } - - public String getDBName() { - return dbName; - } - - public String getTableName() { - return tableName; - } - - public String getTypeName() { - return typeName; - } - - public RoutineLoadDesc getRoutineLoadDesc() { - return routineLoadDesc; - } - - public int getDesiredConcurrentNum() { - return desiredConcurrentNum; - } - - public long getMaxErrorNum() { - return maxErrorNum; - } - - public double getMaxFilterRatio() { - return maxFilterRatio; - } - - public long getMaxBatchIntervalS() { - return maxBatchIntervalS; - } - - public long getMaxBatchRows() { - return maxBatchRows; - } - - public long getMaxBatchSize() { - return maxBatchSizeBytes; - } - - public long getExecMemLimit() { - return execMemLimit; - } - - public int getSendBatchParallelism() { - return sendBatchParallelism; - } - - public boolean isLoadToSingleTablet() { - return loadToSingleTablet; - } - - public boolean isStrictMode() { - return strictMode; - } - - public String getTimezone() { - return timezone; - } - - public LoadTask.MergeType getMergeType() { - return mergeType; - } - - public FileFormatProperties getFileFormatProperties() { - return fileFormatProperties; - } - - public AbstractDataSourceProperties getDataSourceProperties() { - return dataSourceProperties; - } - - public String getComment() { - return comment; - } - - public String getWorkloadGroupName() { - return this.workloadGroupName; - } - - @Override - public void analyze() throws UserException { - super.analyze(); - // check dbName and tableName - checkDBTable(); - // check name - try { - FeNameFormat.checkCommonName(NAME_TYPE, name); - } catch (AnalysisException e) { - // 64 is the length of regular expression matching - // (FeNameFormat.COMMON_NAME_REGEX/UNDERSCORE_COMMON_NAME_REGEX) - throw new AnalysisException(e.getMessage() - + " Maybe routine load job name is longer than 64 or contains illegal characters"); - } - // check load properties include column separator etc. - checkLoadProperties(); - // check routine load job properties include desired concurrent number etc. - checkJobProperties(); - // check data source properties - checkDataSourceProperties(); - // analyze merge type - if (routineLoadDesc != null) { - routineLoadDesc.analyze(); - } else if (mergeType == LoadTask.MergeType.MERGE) { - throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); - } - } - - public void checkDBTable() throws AnalysisException { - labelName.analyze(); - dbName = labelName.getDbName(); - name = labelName.getLabelName(); - Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); - if (isPartialUpdate && isMultiTable) { - throw new AnalysisException("Partial update is not supported in multi-table load."); - } - if (isMultiTable) { - return; - } - if (Strings.isNullOrEmpty(tableName)) { - throw new AnalysisException("Table name should not be null"); - } - Table table = db.getTableOrAnalysisException(tableName); - if (mergeType != LoadTask.MergeType.APPEND - && (table.getType() != Table.TableType.OLAP - || ((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS)) { - throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); - } - if (mergeType != LoadTask.MergeType.APPEND - && !(table.getType() == Table.TableType.OLAP && ((OlapTable) table).hasDeleteSign())) { - throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete."); - } - if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { - throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); - } - } - - public void checkLoadProperties() throws UserException { - Separator columnSeparator = null; - // TODO(yangzhengguo01): add line delimiter to properties - Separator lineDelimiter = null; - ImportColumnsStmt importColumnsStmt = null; - ImportWhereStmt precedingImportWhereStmt = null; - ImportWhereStmt importWhereStmt = null; - ImportSequenceStmt importSequenceStmt = null; - PartitionNames partitionNames = null; - ImportDeleteOnStmt importDeleteOnStmt = null; - if (loadPropertyList != null) { - for (ParseNode parseNode : loadPropertyList) { - if (parseNode instanceof Separator) { - // check column separator - if (columnSeparator != null) { - throw new AnalysisException("repeat setting of column separator"); - } - columnSeparator = (Separator) parseNode; - columnSeparator.analyze(); - } else if (parseNode instanceof ImportColumnsStmt) { - if (isMultiTable) { - throw new AnalysisException("Multi-table load does not support setting columns info"); - } - // check columns info - if (importColumnsStmt != null) { - throw new AnalysisException("repeat setting of columns info"); - } - importColumnsStmt = (ImportColumnsStmt) parseNode; - } else if (parseNode instanceof ImportWhereStmt) { - // check where expr - ImportWhereStmt node = (ImportWhereStmt) parseNode; - if (node.isPreceding()) { - if (isMultiTable) { - throw new AnalysisException("Multi-table load does not support setting columns info"); - } - if (precedingImportWhereStmt != null) { - throw new AnalysisException("repeat setting of preceding where predicate"); - } - precedingImportWhereStmt = node; - } else { - if (importWhereStmt != null) { - throw new AnalysisException("repeat setting of where predicate"); - } - importWhereStmt = node; - } - } else if (parseNode instanceof PartitionNames) { - // check partition names - if (partitionNames != null) { - throw new AnalysisException("repeat setting of partition names"); - } - partitionNames = (PartitionNames) parseNode; - partitionNames.analyze(); - } else if (parseNode instanceof ImportDeleteOnStmt) { - // check delete expr - if (importDeleteOnStmt != null) { - throw new AnalysisException("repeat setting of delete predicate"); - } - importDeleteOnStmt = (ImportDeleteOnStmt) parseNode; - } else if (parseNode instanceof ImportSequenceStmt) { - // check sequence column - if (importSequenceStmt != null) { - throw new AnalysisException("repeat setting of sequence column"); - } - importSequenceStmt = (ImportSequenceStmt) parseNode; - } - } - } - routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt, - precedingImportWhereStmt, importWhereStmt, - partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType, - importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName()); - } - - private void checkJobProperties() throws UserException { - Optional<String> optional = jobProperties.keySet().stream().filter( - entity -> !PROPERTIES_SET.contains(entity)).findFirst(); - if (optional.isPresent()) { - throw new AnalysisException(optional.get() + " is invalid property"); - } - - desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault( - jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY), - Config.max_routine_load_task_concurrent_num, DESIRED_CONCURRENT_NUMBER_PRED, - DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater than 0")).intValue(); - - maxErrorNum = Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY), - RoutineLoadJob.DEFAULT_MAX_ERROR_NUM, MAX_ERROR_NUMBER_PRED, - MAX_ERROR_NUMBER_PROPERTY + " should >= 0"); - - maxFilterRatio = Util.getDoublePropertyOrDefault(jobProperties.get(MAX_FILTER_RATIO_PROPERTY), - RoutineLoadJob.DEFAULT_MAX_FILTER_RATIO, MAX_FILTER_RATIO_PRED, - MAX_FILTER_RATIO_PROPERTY + " should between 0 and 1"); - - maxBatchIntervalS = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_INTERVAL_SEC_PROPERTY), - RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, MAX_BATCH_INTERVAL_PRED, - MAX_BATCH_INTERVAL_SEC_PROPERTY + " should >= 1"); - - maxBatchRows = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_ROWS_PROPERTY), - RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, MAX_BATCH_ROWS_PRED, - MAX_BATCH_ROWS_PROPERTY + " should > 200000"); - - maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY), - RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED, - MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 10GB"); - - strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE), - RoutineLoadJob.DEFAULT_STRICT_MODE, - LoadStmt.STRICT_MODE + " should be a boolean"); - execMemLimit = Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY), - RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, - EXEC_MEM_LIMIT_PROPERTY + " must be greater than 0"); - - sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM), - ConnectContext.get().getSessionVariable().getSendBatchParallelism(), SEND_BATCH_PARALLELISM_PRED, - SEND_BATCH_PARALLELISM + " must be greater than 0")).intValue(); - loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET), - RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET, - LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean"); - - String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP); - if (!StringUtils.isEmpty(inputWorkloadGroupStr)) { - ConnectContext tmpCtx = new ConnectContext(); - if (Config.isCloudMode()) { - tmpCtx.setCloudCluster(ConnectContext.get().getCloudCluster()); - } - tmpCtx.setCurrentUserIdentity(ConnectContext.get().getCurrentUserIdentity()); - tmpCtx.getSessionVariable().setWorkloadGroup(inputWorkloadGroupStr); - List<WorkloadGroup> wgList = Env.getCurrentEnv().getWorkloadGroupMgr() - .getWorkloadGroup(tmpCtx); - if (wgList.size() == 0) { - throw new UserException("Can not find workload group " + inputWorkloadGroupStr); - } - workloadGroupName = inputWorkloadGroupStr; - } - - if (ConnectContext.get() != null) { - timezone = ConnectContext.get().getSessionVariable().getTimeZone(); - } - timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE, timezone)); - - fileFormatProperties.analyzeFileFormatProperties(jobProperties, false); - } - - private void checkDataSourceProperties() throws UserException { - this.dataSourceProperties.setTimezone(this.timezone); - this.dataSourceProperties.analyze(); - } - - @Override - public StmtType stmtType() { - return StmtType.CREATE; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index ee15c69925b..5da8f90a0c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; @@ -524,33 +523,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { return kafkaRoutineLoadJob; } - public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) throws UserException { - // check db and table - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDBName()); - - long id = Env.getCurrentEnv().getNextId(); - KafkaDataSourceProperties kafkaProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties(); - KafkaRoutineLoadJob kafkaRoutineLoadJob; - if (kafkaProperties.isMultiTable()) { - kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), - db.getId(), - kafkaProperties.getBrokerList(), kafkaProperties.getTopic(), stmt.getUserInfo(), true); - } else { - OlapTable olapTable = db.getOlapTableOrDdlException(stmt.getTableName()); - checkMeta(olapTable, stmt.getRoutineLoadDesc()); - long tableId = olapTable.getId(); - // init kafka routine load job - kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), - db.getId(), tableId, - kafkaProperties.getBrokerList(), kafkaProperties.getTopic(), stmt.getUserInfo()); - } - kafkaRoutineLoadJob.setOptional(stmt); - kafkaRoutineLoadJob.checkCustomProperties(); - kafkaRoutineLoadJob.checkCustomPartition(); - - return kafkaRoutineLoadJob; - } - private void checkCustomPartition() throws UserException { if (customKafkaPartitions.isEmpty()) { return; @@ -646,21 +618,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" + UUID.randomUUID()); } - @Override - protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { - super.setOptional(stmt); - KafkaDataSourceProperties kafkaDataSourceProperties - = (KafkaDataSourceProperties) stmt.getDataSourceProperties(); - if (CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets())) { - setCustomKafkaPartitions(kafkaDataSourceProperties); - } - if (MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) { - setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties()); - } - // set group id if not specified - this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" + UUID.randomUUID()); - } - // this is an unprotected method which is called in the initialization function private void setCustomKafkaPartitions(KafkaDataSourceProperties kafkaDataSourceProperties) throws LoadException { @@ -822,8 +779,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); - if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { - this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS)); + if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { + this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); } } LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index e74e8ff3e43..34cd9e5228f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.LoadStmt; @@ -420,73 +419,6 @@ public abstract class RoutineLoadJob } } - protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { - setRoutineLoadDesc(stmt.getRoutineLoadDesc()); - if (stmt.getDesiredConcurrentNum() != -1) { - this.desireTaskConcurrentNum = stmt.getDesiredConcurrentNum(); - } - if (stmt.getMaxErrorNum() != -1) { - this.maxErrorNum = stmt.getMaxErrorNum(); - } - if (stmt.getMaxFilterRatio() != -1) { - this.maxFilterRatio = stmt.getMaxFilterRatio(); - } - if (stmt.getMaxBatchIntervalS() != -1) { - this.maxBatchIntervalS = stmt.getMaxBatchIntervalS(); - } - if (stmt.getMaxBatchRows() != -1) { - this.maxBatchRows = stmt.getMaxBatchRows(); - } - if (stmt.getMaxBatchSize() != -1) { - this.maxBatchSizeBytes = stmt.getMaxBatchSize(); - } - if (stmt.getExecMemLimit() != -1) { - this.execMemLimit = stmt.getExecMemLimit(); - } - if (stmt.getSendBatchParallelism() > 0) { - this.sendBatchParallelism = stmt.getSendBatchParallelism(); - } - if (stmt.isLoadToSingleTablet()) { - this.loadToSingleTablet = stmt.isLoadToSingleTablet(); - } - jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone()); - jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode())); - jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism)); - jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet)); - jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, stmt.isPartialUpdate() ? "true" : "false"); - if (stmt.isPartialUpdate()) { - this.isPartialUpdate = true; - } - jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); - - FileFormatProperties fileFormatProperties = stmt.getFileFormatProperties(); - if (fileFormatProperties instanceof CsvFileFormatProperties) { - CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) fileFormatProperties; - jobProperties.put(FileFormatProperties.PROP_FORMAT, "csv"); - jobProperties.put(LoadStmt.KEY_ENCLOSE, new String(new byte[]{csvFileFormatProperties.getEnclose()})); - jobProperties.put(LoadStmt.KEY_ESCAPE, new String(new byte[]{csvFileFormatProperties.getEscape()})); - this.enclose = csvFileFormatProperties.getEnclose(); - this.escape = csvFileFormatProperties.getEscape(); - } else if (fileFormatProperties instanceof JsonFileFormatProperties) { - JsonFileFormatProperties jsonFileFormatProperties = (JsonFileFormatProperties) fileFormatProperties; - jobProperties.put(FileFormatProperties.PROP_FORMAT, "json"); - jobProperties.put(JsonFileFormatProperties.PROP_JSON_PATHS, jsonFileFormatProperties.getJsonPaths()); - jobProperties.put(JsonFileFormatProperties.PROP_JSON_ROOT, jsonFileFormatProperties.getJsonRoot()); - jobProperties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, - String.valueOf(jsonFileFormatProperties.isStripOuterArray())); - jobProperties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, - String.valueOf(jsonFileFormatProperties.isNumAsString())); - jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, - String.valueOf(jsonFileFormatProperties.isFuzzyParse())); - } else { - throw new UserException("Invalid format type."); - } - - if (!StringUtils.isEmpty(stmt.getWorkloadGroupName())) { - jobProperties.put(WORKLOAD_GROUP, stmt.getWorkloadGroupName()); - } - } - protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { if (routineLoadDesc != null) { if (routineLoadDesc.getColumnsInfo() != null) { @@ -1820,15 +1752,15 @@ public abstract class RoutineLoadJob } // 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt sb.append("PROPERTIES\n(\n"); - appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false); - appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false); - appendProperties(sb, CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio, false); - appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false); - appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false); - appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false); + appendProperties(sb, CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false); + appendProperties(sb, CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false); + appendProperties(sb, CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio, false); + appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false); + appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false); + appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false); appendProperties(sb, FileFormatProperties.PROP_FORMAT, getFormat(), false); if (isPartialUpdate) { - appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, isPartialUpdate, false); + appendProperties(sb, CreateRoutineLoadInfo.PARTIAL_COLUMNS, isPartialUpdate, false); } appendProperties(sb, JsonFileFormatProperties.PROP_JSON_PATHS, getJsonPaths(), false); appendProperties(sb, JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, isStripOuterArray(), false); @@ -1923,14 +1855,14 @@ public abstract class RoutineLoadJob sequenceCol == null ? STAR_STRING : sequenceCol); // job properties defined in CreateRoutineLoadStmt - jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); - jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum)); - jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS)); - jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows)); - jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, String.valueOf(maxBatchSizeBytes)); - jobProperties.put(CreateRoutineLoadStmt.CURRENT_CONCURRENT_NUMBER_PROPERTY, + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum)); + jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS)); + jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows)); + jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY, String.valueOf(maxBatchSizeBytes)); + jobProperties.put(CreateRoutineLoadInfo.CURRENT_CONCURRENT_NUMBER_PROPERTY, String.valueOf(currentTaskConcurrentNum)); - jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, + jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, String.valueOf(desireTaskConcurrentNum)); jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(execMemLimit)); jobProperties.put(LoadStmt.KEY_IN_PARAM_MERGE_TYPE, mergeType.toString()); @@ -1974,7 +1906,7 @@ public abstract class RoutineLoadJob isMultiTable = true; } jobProperties.forEach((k, v) -> { - if (k.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { + if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { isPartialUpdate = Boolean.parseBoolean(v); } }); @@ -2015,35 +1947,35 @@ public abstract class RoutineLoadJob // for ALTER ROUTINE LOAD protected void modifyCommonJobProperties(Map<String, String> jobProperties) { - if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) { + if (jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)) { this.desireTaskConcurrentNum = Integer.parseInt( - jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)); } - if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) { + if (jobProperties.containsKey(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY)) { this.maxErrorNum = Long.parseLong( - jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)); + jobProperties.remove(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY)); } - if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY)) { + if (jobProperties.containsKey(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY)) { this.maxFilterRatio = Double.parseDouble( - jobProperties.remove(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY)); - this.jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); + jobProperties.remove(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY)); + this.jobProperties.put(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); } - if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) { + if (jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY)) { this.maxBatchIntervalS = Long.parseLong( - jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)); + jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY)); } - if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) { + if (jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)) { this.maxBatchRows = Long.parseLong( - jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)); + jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)); } - if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) { + if (jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)) { this.maxBatchSizeBytes = Long.parseLong( - jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)); + jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index a22eae6b009..7d5486f5565 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -203,38 +202,6 @@ public class RoutineLoadManager implements Writable { info.getTableName()); } - // cloud override - public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) - throws UserException { - // check load auth - if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), - InternalCatalog.INTERNAL_CATALOG_NAME, - createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getTableName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName()); - } - - RoutineLoadJob routineLoadJob = null; - LoadDataSourceType type = LoadDataSourceType.valueOf(createRoutineLoadStmt.getTypeName()); - switch (type) { - case KAFKA: - routineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); - break; - default: - throw new UserException("Unknown data source type: " + type); - } - - routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt()); - routineLoadJob.setComment(createRoutineLoadStmt.getComment()); - addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName(), - createRoutineLoadStmt.getTableName()); - } - public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName, String tableName) throws UserException { writeLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java index 2e13c3e7e6b..dacc706ea35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java @@ -444,7 +444,10 @@ public class CreateRoutineLoadInfo { importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName()); } - private void checkJobProperties() throws UserException { + /** + * checkJobProperties + */ + public void checkJobProperties() throws UserException { Optional<String> optional = jobProperties.keySet().stream().filter( entity -> !PROPERTIES_SET.contains(entity)).findFirst(); if (optional.isPresent()) { @@ -506,7 +509,7 @@ public class CreateRoutineLoadInfo { this.workloadGroupName = inputWorkloadGroupStr; } - if (ConnectContext.get() != null) { + if (ConnectContext.get().getSessionVariable().getTimeZone() != null) { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); } timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, timezone)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 3540fc6bb29..0b87a3550bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -17,9 +17,7 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.CreateMaterializedViewStmt; -import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DdlStmt; import org.apache.doris.catalog.Env; @@ -44,10 +42,6 @@ public class DdlExecutor { env.createTable((CreateTableStmt) ddlStmt); } else if (ddlStmt instanceof CreateMaterializedViewStmt) { env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt); - } else if (ddlStmt instanceof AlterTableStmt) { - env.alterTable((AlterTableStmt) ddlStmt); - } else if (ddlStmt instanceof CreateRoutineLoadStmt) { - env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); } else { LOG.warn("Unkown statement " + ddlStmt.getClass()); throw new DdlException("Unknown statement."); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 63452a5d59c..1d8b58257dc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -17,10 +17,8 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.ImportSequenceStmt; import org.apache.doris.analysis.LabelName; -import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.UserIdentity; @@ -42,6 +40,10 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.mysql.privilege.MockedAuth; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; +import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; @@ -66,6 +68,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -76,6 +79,7 @@ public class KafkaRoutineLoadJobTest { private String jobName = "job1"; private String dbName = "db1"; private LabelName labelName = new LabelName(dbName, jobName); + private LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName); private String tableNameString = "table1"; private String topicName = "topic1"; private String serverAddress = "http://127.0.0.1:8080"; @@ -246,10 +250,11 @@ public class KafkaRoutineLoadJobTest { public void testFromCreateStmt(@Mocked Env env, @Injectable Database database, @Injectable OlapTable table) throws UserException { - CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + CreateRoutineLoadInfo createRoutineLoadInfo = initCreateRoutineLoadInfo(); + createRoutineLoadInfo.validate(connectContext); RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, null, null, partitionNames, null, LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName()); - Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + Deencapsulation.setField(createRoutineLoadInfo, "routineLoadDesc", routineLoadDesc); List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList(); List<PartitionInfo> kafkaPartitionInfoList = Lists.newArrayList(); for (String s : kafkaPartitionString.split(",")) { @@ -261,7 +266,7 @@ public class KafkaRoutineLoadJobTest { dsProperties.setKafkaPartitionOffsets(partitionIdToOffset); Deencapsulation.setField(dsProperties, "brokerList", serverAddress); Deencapsulation.setField(dsProperties, "topic", topicName); - Deencapsulation.setField(createRoutineLoadStmt, "dataSourceProperties", dsProperties); + Deencapsulation.setField(createRoutineLoadInfo, "dataSourceProperties", dsProperties); long dbId = 1L; long tableId = 2L; @@ -305,7 +310,7 @@ public class KafkaRoutineLoadJobTest { } }; - KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateInfo(createRoutineLoadInfo, connectContext); Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName()); Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId()); Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId()); @@ -316,12 +321,9 @@ public class KafkaRoutineLoadJobTest { Assert.assertEquals(sequenceStmt.getSequenceColName(), kafkaRoutineLoadJob.getSequenceCol()); } - private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { - List<ParseNode> loadPropertyList = new ArrayList<>(); - loadPropertyList.add(columnSeparator); - loadPropertyList.add(partitionNames); + private CreateRoutineLoadInfo initCreateRoutineLoadInfo() { Map<String, String> properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); String typeName = LoadDataSourceType.KAFKA.name(); Map<String, String> customProperties = Maps.newHashMap(); @@ -329,11 +331,11 @@ public class KafkaRoutineLoadJobTest { customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), kafkaPartitionString); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, - loadPropertyList, properties, - typeName, customProperties, - LoadTask.MergeType.APPEND, ""); - Deencapsulation.setField(createRoutineLoadStmt, "name", jobName); - return createRoutineLoadStmt; + LoadSeparator loadSeparator = new LoadSeparator(","); + Map<String, LoadProperty> loadPropertyMap = new HashMap<>(); + loadPropertyMap.put(loadSeparator.getClass().getName(), loadSeparator); + CreateRoutineLoadInfo createRoutineLoadInfo = new CreateRoutineLoadInfo(labelNameInfo, tableNameString, + loadPropertyMap, properties, typeName, customProperties, LoadTask.MergeType.APPEND, ""); + return createRoutineLoadInfo; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index dd38d2fbbe7..e86d10d0c8c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -17,8 +17,6 @@ package org.apache.doris.load.routineload; -import org.apache.doris.analysis.CreateRoutineLoadStmt; -import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.UserIdentity; @@ -40,13 +38,16 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; +import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; import org.apache.doris.nereids.trees.plans.commands.load.PauseRoutineLoadCommand; import org.apache.doris.nereids.trees.plans.commands.load.ResumeRoutineLoadCommand; import org.apache.doris.nereids.trees.plans.commands.load.StopRoutineLoadCommand; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.OriginStatement; import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; @@ -55,7 +56,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import mockit.Expectations; import mockit.Injectable; -import mockit.Mock; import mockit.MockUp; import mockit.Mocked; import org.apache.logging.log4j.LogManager; @@ -65,6 +65,7 @@ import org.junit.Test; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -76,75 +77,6 @@ public class RoutineLoadManagerTest { @Mocked private SystemInfoService systemInfoService; - @Test - public void testAddJobByStmt(@Injectable AccessControllerManager accessManager, - @Injectable TResourceInfo tResourceInfo, - @Mocked ConnectContext connectContext, - @Mocked Env env) throws UserException { - String jobName = "job1"; - String dbName = "db1"; - LabelName labelName = new LabelName(dbName, jobName); - String tableNameString = "table1"; - List<ParseNode> loadPropertyList = new ArrayList<>(); - Separator columnSeparator = new Separator(","); - loadPropertyList.add(columnSeparator); - Map<String, String> properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); - String typeName = LoadDataSourceType.KAFKA.name(); - Map<String, String> customProperties = Maps.newHashMap(); - String topicName = "topic1"; - customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); - String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, - loadPropertyList, properties, - typeName, customProperties, - LoadTask.MergeType.APPEND, ""); - createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); - - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, 1L, 1L, - serverAddress, topicName, UserIdentity.ADMIN); - - new MockUp<KafkaRoutineLoadJob>() { - @Mock - public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) { - return kafkaRoutineLoadJob; - } - }; - - new Expectations() { - { - env.getAccessManager(); - minTimes = 0; - result = accessManager; - accessManager.checkTblPriv((ConnectContext) any, anyString, anyString, anyString, PrivPredicate.LOAD); - minTimes = 0; - result = true; - } - }; - RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); - - Map<String, RoutineLoadJob> idToRoutineLoadJob = - Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); - Assert.assertEquals(1, idToRoutineLoadJob.size()); - RoutineLoadJob routineLoadJob = idToRoutineLoadJob.values().iterator().next(); - Assert.assertEquals(1L, routineLoadJob.getDbId()); - Assert.assertEquals(jobName, routineLoadJob.getName()); - Assert.assertEquals(1L, routineLoadJob.getTableId()); - Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); - Assert.assertEquals(true, routineLoadJob instanceof KafkaRoutineLoadJob); - - Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = - Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); - Assert.assertEquals(1, dbToNameToRoutineLoadJob.size()); - Assert.assertEquals(Long.valueOf(1L), dbToNameToRoutineLoadJob.keySet().iterator().next()); - Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(1L); - Assert.assertEquals(jobName, nameToRoutineLoadJob.keySet().iterator().next()); - Assert.assertEquals(1, nameToRoutineLoadJob.values().size()); - Assert.assertEquals(routineLoadJob, nameToRoutineLoadJob.values().iterator().next().get(0)); - } - @Test public void testCreateJobAuthDeny(@Injectable AccessControllerManager accessManager, @Injectable TResourceInfo tResourceInfo, @@ -152,25 +84,24 @@ public class RoutineLoadManagerTest { @Mocked Env env) { String jobName = "job1"; String dbName = "db1"; - LabelName labelName = new LabelName(dbName, jobName); + LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName); String tableNameString = "table1"; List<ParseNode> loadPropertyList = new ArrayList<>(); Separator columnSeparator = new Separator(","); loadPropertyList.add(columnSeparator); Map<String, String> properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); String typeName = LoadDataSourceType.KAFKA.name(); Map<String, String> customProperties = Maps.newHashMap(); String topicName = "topic1"; customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); String serverAddress = "http://127.0.0.1:8080"; customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); - CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, - loadPropertyList, properties, - typeName, customProperties, - LoadTask.MergeType.APPEND, ""); - createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0)); - + LoadSeparator loadSeparator = new LoadSeparator(","); + Map<String, LoadProperty> loadPropertyMap = new HashMap<>(); + loadPropertyMap.put(loadSeparator.getClass().getName(), loadSeparator); + CreateRoutineLoadInfo createRoutineLoadInfo = new CreateRoutineLoadInfo(labelNameInfo, tableNameString, + loadPropertyMap, properties, typeName, customProperties, LoadTask.MergeType.APPEND, ""); new Expectations() { { @@ -184,7 +115,8 @@ public class RoutineLoadManagerTest { }; RoutineLoadManager routineLoadManager = new RoutineLoadManager(); try { - routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt); + createRoutineLoadInfo.checkJobProperties(); + routineLoadManager.createRoutineLoadJob(createRoutineLoadInfo, connectContext); Assert.fail(); } catch (LoadException | DdlException e) { Assert.fail(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index a93b4b3a3d2..8a1550d48f5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -17,11 +17,11 @@ package org.apache.doris.persist; -import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import com.google.common.collect.Maps; import org.junit.Assert; @@ -48,7 +48,7 @@ public class AlterRoutineLoadOperationLogTest { long jobId = 1000; Map<String, String> jobProperties = Maps.newHashMap(); - jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); + jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); Map<String, String> dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "0, 1"); @@ -71,7 +71,7 @@ public class AlterRoutineLoadOperationLogTest { AlterRoutineLoadJobOperationLog log2 = AlterRoutineLoadJobOperationLog.read(in); Assert.assertEquals(1, log2.getJobProperties().size()); - Assert.assertEquals("5", log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); + Assert.assertEquals("5", log2.getJobProperties().get(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)); KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) log2.getDataSourceProperties(); Assert.assertEquals(null, kafkaDataSourceProperties.getBrokerList()); Assert.assertEquals(null, kafkaDataSourceProperties.getTopic()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org