This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 90bae169c05 [Chore](nereids) remove LoadStmt (#55538)
90bae169c05 is described below
commit 90bae169c05e5ddc75b87910922eab2736c1a2ee
Author: yaoxiao <[email protected]>
AuthorDate: Mon Sep 8 10:28:12 2025 +0800
[Chore](nereids) remove LoadStmt (#55538)
---
.../apache/doris/analysis/CopyIntoProperties.java | 3 +-
.../org/apache/doris/analysis/CopyProperties.java | 21 +-
.../java/org/apache/doris/analysis/LoadStmt.java | 647 ---------------------
.../apache/doris/cloud/load/CloudLoadManager.java | 9 -
.../doris/job/extensions/insert/InsertJob.java | 10 +-
.../org/apache/doris/load/loadv2/BulkLoadJob.java | 32 -
.../java/org/apache/doris/load/loadv2/LoadJob.java | 69 +--
.../org/apache/doris/load/loadv2/LoadManager.java | 53 --
.../apache/doris/load/loadv2/MysqlLoadManager.java | 48 +-
.../doris/load/routineload/RoutineLoadJob.java | 24 +-
.../nereids/load/NereidsRoutineLoadTaskInfo.java | 6 +-
.../trees/plans/commands/ExportCommand.java | 7 +-
.../nereids/trees/plans/commands/LoadCommand.java | 3 +-
.../trees/plans/commands/info/CopyIntoInfo.java | 4 +-
.../doris/load/loadv2/BrokerLoadJobTest.java | 123 ----
.../org/apache/doris/load/loadv2/LoadJobTest.java | 12 +-
.../apache/doris/persist/LoadJobV2PersistTest.java | 4 +-
17 files changed, 106 insertions(+), 969 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
index 792fc6be901..510a8c3b6f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
@@ -18,6 +18,7 @@
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import com.google.common.collect.ImmutableSet;
@@ -70,7 +71,7 @@ public class CopyIntoProperties extends CopyProperties {
public Map<String, String> getExecProperties() {
Map<String, String> results = getKeysProperties(EXEC_PROPERTIES);
- results.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(getMaxFilterRatio()));
+ results.put(LoadCommand.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(getMaxFilterRatio()));
return results;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
index d70976d3f3b..5d973718c62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.commons.lang3.StringUtils;
@@ -33,14 +34,14 @@ public class CopyProperties {
// properties for type, compression, column_separator
public static final String TYPE = FILE_PREFIX + "type";
public static final String COMPRESSION = FILE_PREFIX + "compression";
- public static final String COLUMN_SEPARATOR = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR;
+ public static final String COLUMN_SEPARATOR = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_COLUMN_SEPARATOR;
// properties for data desc
- public static final String LINE_DELIMITER = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_LINE_DELIMITER;
- public static final String PARAM_STRIP_OUTER_ARRAY = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY;
- public static final String PARAM_FUZZY_PARSE = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_FUZZY_PARSE;
- public static final String PARAM_NUM_AS_STRING = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_NUM_AS_STRING;
- public static final String PARAM_JSONPATHS = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_JSONPATHS;
- public static final String PARAM_JSONROOT = FILE_PREFIX +
LoadStmt.KEY_IN_PARAM_JSONROOT;
+ public static final String LINE_DELIMITER = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_LINE_DELIMITER;
+ public static final String PARAM_STRIP_OUTER_ARRAY = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_STRIP_OUTER_ARRAY;
+ public static final String PARAM_FUZZY_PARSE = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_FUZZY_PARSE;
+ public static final String PARAM_NUM_AS_STRING = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_NUM_AS_STRING;
+ public static final String PARAM_JSONPATHS = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_JSONPATHS;
+ public static final String PARAM_JSONROOT = FILE_PREFIX +
LoadCommand.KEY_IN_PARAM_JSONROOT;
public static final String COPY_PREFIX = "copy.";
// property for size limit, async, on_error
@@ -49,9 +50,9 @@ public class CopyProperties {
public static final String ON_ERROR = COPY_PREFIX + "on_error";
public static final String ON_ERROR_CONTINUE = "continue";
public static final String ON_ERROR_ABORT_STATEMENT = "abort_statement";
- public static final String ON_ERROR_MAX_FILTER_RATIO =
LoadStmt.MAX_FILTER_RATIO_PROPERTY + "_";
- public static final String STRICT_MODE = COPY_PREFIX +
LoadStmt.STRICT_MODE;
- public static final String LOAD_PARALLELISM = COPY_PREFIX +
LoadStmt.LOAD_PARALLELISM;
+ public static final String ON_ERROR_MAX_FILTER_RATIO =
LoadCommand.MAX_FILTER_RATIO_PROPERTY + "_";
+ public static final String STRICT_MODE = COPY_PREFIX +
LoadCommand.STRICT_MODE;
+ public static final String LOAD_PARALLELISM = COPY_PREFIX +
LoadCommand.LOAD_PARALLELISM;
// If 'copy.force' is true, load files to table without checking if files
have been loaded, and copy job will not
// be recorded in meta service. So it may cause one file is copied to a
table many times.
public static final String FORCE = COPY_PREFIX + "force";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
deleted file mode 100644
index 0ea39c56a85..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ /dev/null
@@ -1,647 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.cloud.security.SecurityChecker;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.InternalErrorCode;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
-import org.apache.doris.load.EtlJobType;
-import org.apache.doris.load.loadv2.LoadTask;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-// LOAD statement, load files into tables.
-//
-// syntax:
-// LOAD LABEL load_label
-// (data_desc, ...)
-// [broker_desc]
-// [resource_desc]
-// [PROPERTIES (key1=value1, )]
-//
-// load_label:
-// db_name.label_name
-//
-// data_desc:
-// DATA INFILE ('file_path', ...)
-// [NEGATIVE]
-// INTO TABLE tbl_name
-// [PARTITION (p1, p2)]
-// [COLUMNS TERMINATED BY separator ]
-// [(col1, ...)]
-// [SET (k1=f1(xx), k2=f2(xx))]
-//
-// broker_desc:
-// WITH BROKER name
-// (key2=value2, ...)
-//
-// resource_desc:
-// WITH RESOURCE name
-// (key3=value3, ...)
-public class LoadStmt extends DdlStmt implements NotFallbackInParser {
- private static final Logger LOG = LogManager.getLogger(LoadStmt.class);
-
- public static final String TIMEOUT_PROPERTY = "timeout";
- public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
- public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
- public static final String CLUSTER_PROPERTY = "cluster";
- public static final String STRICT_MODE = "strict_mode";
- public static final String TIMEZONE = "timezone";
- public static final String LOAD_PARALLELISM = "load_parallelism";
- public static final String SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
- public static final String PRIORITY = "priority";
- public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
-
- // deprecated, keeping this property to make LoadStmt#checkProperties()
happy
- public static final String USE_NEW_LOAD_SCAN_NODE =
"use_new_load_scan_node";
-
- // for load data from Baidu Object Store(BOS) todo wait new property
support
- public static final String BOS_ENDPOINT = "bos_endpoint";
- public static final String BOS_ACCESSKEY = "bos_accesskey";
- public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";
-
- // mini load params
- public static final String KEY_IN_PARAM_COLUMNS = "columns";
- public static final String KEY_IN_PARAM_SET = "set";
- public static final String KEY_IN_PARAM_HLL = "hll";
- public static final String KEY_IN_PARAM_COLUMN_SEPARATOR =
"column_separator";
- public static final String KEY_IN_PARAM_LINE_DELIMITER = "line_delimiter";
- public static final String KEY_IN_PARAM_PARTITIONS = "partitions";
- public static final String KEY_IN_PARAM_FORMAT_TYPE = "format";
-
- public static final String KEY_IN_PARAM_WHERE = "where";
- public static final String KEY_IN_PARAM_MAX_FILTER_RATIO =
"max_filter_ratio";
- public static final String KEY_IN_PARAM_TIMEOUT = "timeout";
- public static final String KEY_IN_PARAM_TEMP_PARTITIONS =
"temporary_partitions";
- public static final String KEY_IN_PARAM_NEGATIVE = "negative";
- public static final String KEY_IN_PARAM_STRICT_MODE = "strict_mode";
- public static final String KEY_IN_PARAM_TIMEZONE = "timezone";
- public static final String KEY_IN_PARAM_EXEC_MEM_LIMIT = "exec_mem_limit";
- public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths";
- public static final String KEY_IN_PARAM_JSONROOT = "json_root";
- public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY =
"strip_outer_array";
- public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse";
- public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string";
- public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type";
- public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete";
- public static final String KEY_IN_PARAM_FUNCTION_COLUMN =
"function_column";
- public static final String KEY_IN_PARAM_SEQUENCE_COL = "sequence_col";
- public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id";
- public static final String KEY_SKIP_LINES = "skip_lines";
- public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
- public static final String PARTIAL_COLUMNS = "partial_columns";
- public static final String PARTIAL_UPDATE_NEW_KEY_POLICY =
"partial_update_new_key_behavior";
-
- public static final String KEY_COMMENT = "comment";
-
- public static final String KEY_CLOUD_CLUSTER = "cloud_cluster";
-
- public static final String KEY_ENCLOSE = "enclose";
-
- public static final String KEY_ESCAPE = "escape";
-
- private final LabelName label;
- private final List<DataDescription> dataDescriptions;
- private final BrokerDesc brokerDesc;
- private final ResourceDesc resourceDesc;
- private final Map<String, String> properties;
- private String user;
-
- private boolean isMysqlLoad = false;
-
- private EtlJobType etlJobType = EtlJobType.UNKNOWN;
-
- private String comment;
-
- public static final ImmutableMap<String, Function> PROPERTIES_MAP = new
ImmutableMap.Builder<String, Function>()
- .put(TIMEOUT_PROPERTY, new Function<String, Long>() {
- @Override
- public @Nullable Long apply(@Nullable String s) {
- return Long.valueOf(s);
- }
- })
- .put(MAX_FILTER_RATIO_PROPERTY, new Function<String, Double>() {
- @Override
- public @Nullable Double apply(@Nullable String s) {
- return Double.valueOf(s);
- }
- })
- .put(EXEC_MEM_LIMIT, new Function<String, Long>() {
- @Override
- public @Nullable Long apply(@Nullable String s) {
- return Long.valueOf(s);
- }
- })
- .put(STRICT_MODE, new Function<String, Boolean>() {
- @Override
- public @Nullable Boolean apply(@Nullable String s) {
- return Boolean.valueOf(s);
- }
- })
- .put(PARTIAL_COLUMNS, new Function<String, Boolean>() {
- @Override
- public @Nullable Boolean apply(@Nullable String s) {
- return Boolean.valueOf(s);
- }
- })
- .put(PARTIAL_UPDATE_NEW_KEY_POLICY, new Function<String,
TPartialUpdateNewRowPolicy>() {
- @Override
- public @Nullable TPartialUpdateNewRowPolicy apply(@Nullable
String s) {
- return TPartialUpdateNewRowPolicy.valueOf(s.toUpperCase());
- }
- })
- .put(TIMEZONE, new Function<String, String>() {
- @Override
- public @Nullable String apply(@Nullable String s) {
- return s;
- }
- })
- .put(LOAD_PARALLELISM, new Function<String, Integer>() {
- @Override
- public @Nullable Integer apply(@Nullable String s) {
- return Integer.valueOf(s);
- }
- })
- .put(SEND_BATCH_PARALLELISM, new Function<String, Integer>() {
- @Override
- public @Nullable Integer apply(@Nullable String s) {
- return Integer.valueOf(s);
- }
- })
- .put(CLUSTER_PROPERTY, new Function<String, String>() {
- @Override
- public @Nullable String apply(@Nullable String s) {
- return s;
- }
- })
- .put(LOAD_TO_SINGLE_TABLET, new Function<String, Boolean>() {
- @Override
- public @Nullable Boolean apply(@Nullable String s) {
- return Boolean.valueOf(s);
- }
- })
- .put(USE_NEW_LOAD_SCAN_NODE, new Function<String, Boolean>() {
- @Override
- public @Nullable Boolean apply(@Nullable String s) {
- return Boolean.valueOf(s);
- }
- })
- .put(KEY_SKIP_LINES, new Function<String, Integer>() {
- @Override
- public @Nullable Integer apply(@Nullable String s) {
- return Integer.valueOf(s);
- }
- })
- .put(KEY_TRIM_DOUBLE_QUOTES, new Function<String, Boolean>() {
- @Override
- public @Nullable Boolean apply(@Nullable String s) {
- return Boolean.valueOf(s);
- }
- })
- .put(PRIORITY, (Function<String, LoadTask.Priority>) s ->
LoadTask.Priority.valueOf(s))
- .build();
-
- public LoadStmt(DataDescription dataDescription, Map<String, String>
properties, String comment) {
- this.label = new LabelName();
- this.dataDescriptions = Lists.newArrayList(dataDescription);
- this.brokerDesc = null;
- this.resourceDesc = null;
- this.properties = properties;
- this.user = null;
- this.isMysqlLoad = true;
- if (comment != null) {
- this.comment = comment;
- } else {
- this.comment = "";
- }
- }
-
- public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
- BrokerDesc brokerDesc, Map<String, String> properties,
String comment) {
- this.label = label;
- this.dataDescriptions = dataDescriptions;
- this.brokerDesc = brokerDesc;
- this.resourceDesc = null;
- this.properties = properties;
- this.user = null;
- if (comment != null) {
- this.comment = comment;
- } else {
- this.comment = "";
- }
- }
-
- public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
- ResourceDesc resourceDesc, Map<String, String> properties,
String comment) {
- this.label = label;
- this.dataDescriptions = dataDescriptions;
- this.brokerDesc = null;
- this.resourceDesc = resourceDesc;
- this.properties = properties;
- this.user = null;
- if (comment != null) {
- this.comment = comment;
- } else {
- this.comment = "";
- }
- }
-
- public LabelName getLabel() {
- return label;
- }
-
- public List<DataDescription> getDataDescriptions() {
- return dataDescriptions;
- }
-
- public BrokerDesc getBrokerDesc() {
- return brokerDesc;
- }
-
- public ResourceDesc getResourceDesc() {
- return resourceDesc;
- }
-
- public Map<String, String> getProperties() {
- return properties;
- }
-
- @Deprecated
- public String getUser() {
- return user;
- }
-
- public EtlJobType getEtlJobType() {
- return etlJobType;
- }
-
- public static void checkProperties(Map<String, String> properties) throws
DdlException {
- if (properties == null) {
- return;
- }
-
- for (Entry<String, String> entry : properties.entrySet()) {
- if (!PROPERTIES_MAP.containsKey(entry.getKey())) {
- throw new DdlException(entry.getKey() + " is invalid
property");
- }
- }
-
- // exec mem
- final String execMemProperty = properties.get(EXEC_MEM_LIMIT);
- if (execMemProperty != null) {
- try {
- final long execMem = Long.valueOf(execMemProperty);
- if (execMem <= 0) {
- throw new DdlException(EXEC_MEM_LIMIT + " must be greater
than 0");
- }
- } catch (NumberFormatException e) {
- throw new DdlException(EXEC_MEM_LIMIT + " is not a number.");
- }
- }
-
- // timeout
- final String timeoutLimitProperty = properties.get(TIMEOUT_PROPERTY);
- if (timeoutLimitProperty != null) {
- try {
- final int timeoutLimit = Integer.valueOf(timeoutLimitProperty);
- if (timeoutLimit < 0) {
- throw new DdlException(TIMEOUT_PROPERTY + " must be
greater than 0");
- }
- } catch (NumberFormatException e) {
- throw new DdlException(TIMEOUT_PROPERTY + " is not a number.");
- }
- }
-
- // max filter ratio
- final String maxFilterRadioProperty =
properties.get(MAX_FILTER_RATIO_PROPERTY);
- if (maxFilterRadioProperty != null) {
- try {
- double maxFilterRatio = Double.valueOf(maxFilterRadioProperty);
- if (maxFilterRatio < 0.0 || maxFilterRatio > 1.0) {
- throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " must
between 0.0 and 1.0.");
- }
- } catch (NumberFormatException e) {
- throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " is not a
number.");
- }
- }
-
- // strict mode
- final String strictModeProperty = properties.get(STRICT_MODE);
- if (strictModeProperty != null) {
- if (!strictModeProperty.equalsIgnoreCase("true")
- && !strictModeProperty.equalsIgnoreCase("false")) {
- throw new DdlException(STRICT_MODE + " is not a boolean");
- }
- }
-
- // partial update
- final String partialColumnsProperty = properties.get(PARTIAL_COLUMNS);
- if (partialColumnsProperty != null) {
- if (!partialColumnsProperty.equalsIgnoreCase("true")
- && !partialColumnsProperty.equalsIgnoreCase("false")) {
- throw new DdlException(PARTIAL_COLUMNS + " is not a boolean");
- }
- }
-
- // partial update new key policy
- final String partialUpdateNewKeyPolicyProperty =
properties.get(PARTIAL_UPDATE_NEW_KEY_POLICY);
- if (partialUpdateNewKeyPolicyProperty != null) {
- if (!partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("append")
- &&
!partialUpdateNewKeyPolicyProperty.equalsIgnoreCase("error")) {
- throw new DdlException(PARTIAL_UPDATE_NEW_KEY_POLICY + "
should be one of [append, error], but found "
- + partialUpdateNewKeyPolicyProperty);
- }
- }
-
- // time zone
- final String timezone = properties.get(TIMEZONE);
- if (timezone != null) {
- properties.put(TIMEZONE,
TimeUtils.checkTimeZoneValidAndStandardize(
- properties.getOrDefault(LoadStmt.TIMEZONE,
TimeUtils.DEFAULT_TIME_ZONE)));
- }
-
- // send batch parallelism
- final String sendBatchParallelism =
properties.get(SEND_BATCH_PARALLELISM);
- if (sendBatchParallelism != null) {
- try {
- final int sendBatchParallelismValue =
Integer.valueOf(sendBatchParallelism);
- if (sendBatchParallelismValue < 1) {
- throw new DdlException(SEND_BATCH_PARALLELISM + " must be
greater than 0");
- }
- } catch (NumberFormatException e) {
- throw new DdlException(SEND_BATCH_PARALLELISM + " is not a
number.");
- }
- }
-
- // priority
- final String priority = properties.get(PRIORITY);
- if (priority != null) {
- try {
- LoadTask.Priority.valueOf(priority);
- } catch (IllegalArgumentException | NullPointerException e) {
- throw new DdlException(PRIORITY + " must be in
[LOW/NORMAL/HIGH].");
- }
- }
- }
-
- @Override
- public void analyze() throws UserException {
- super.analyze();
- if (!isMysqlLoad) {
- label.analyze();
- }
- if (dataDescriptions == null || dataDescriptions.isEmpty()) {
- throw new AnalysisException("No data file in load statement.");
- }
- // check data descriptions, support 2 cases bellow:
- // case 1: multi file paths, multi data descriptions
- // case 2: one hive table, one data description
- boolean isLoadFromTable = false;
- for (DataDescription dataDescription : dataDescriptions) {
- if (brokerDesc == null && resourceDesc == null && !isMysqlLoad) {
- dataDescription.setIsHadoopLoad(true);
- }
-
- if (dataDescription.isLoadFromTable()) {
- isLoadFromTable = true;
- }
-
-
- if (brokerDesc != null && !brokerDesc.isMultiLoadBroker()) {
- for (int i = 0; i < dataDescription.getFilePaths().size();
i++) {
- String location =
brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
- dataDescription.getFilePaths().set(i, location);
- dataDescription.getFilePaths().set(i,
dataDescription.getFilePaths().get(i));
- }
- }
- }
- if (isLoadFromTable) {
- if (dataDescriptions.size() > 1) {
- throw new AnalysisException("Only support one olap table load
from one external table");
- }
- if (resourceDesc == null) {
- throw new AnalysisException("Load from table should use Spark
Load");
- }
- }
-
- // mysql load only have one data desc.
- if (isMysqlLoad && !dataDescriptions.get(0).isClientLocal()) {
- for (String path : dataDescriptions.get(0).getFilePaths()) {
- if (Config.mysql_load_server_secure_path.isEmpty()) {
- throw new AnalysisException("Load local data from fe local
is not enabled. If you want to use it,"
- + " plz set the `mysql_load_server_secure_path`
for FE to be a right path.");
- } else {
- File file = new File(path);
- try {
- if
(!(file.getCanonicalPath().startsWith(Config.mysql_load_server_secure_path))) {
- throw new AnalysisException("Local file should be
under the secure path of FE.");
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (!file.exists()) {
- throw new AnalysisException("File: " + path + " is not
exists.");
- }
- }
- }
- }
-
- if (resourceDesc != null) {
- resourceDesc.analyze();
- etlJobType = resourceDesc.getEtlJobType();
- // check resource usage privilege
- if
(!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(),
-
resourceDesc.getName(),
-
PrivPredicate.USAGE)) {
- throw new AnalysisException("USAGE denied to user '" +
ConnectContext.get().getQualifiedUser()
- + "'@'" +
ConnectContext.get().getRemoteIP()
- + "' for resource '" +
resourceDesc.getName() + "'");
- }
- } else if (brokerDesc != null) {
- etlJobType = EtlJobType.BROKER;
- checkS3Param();
- } else if (isMysqlLoad) {
- etlJobType = EtlJobType.LOCAL_FILE;
- } else {
- etlJobType = EtlJobType.UNKNOWN;
- }
-
- try {
- checkProperties(properties);
- } catch (DdlException e) {
- throw new AnalysisException(e.getMessage());
- }
-
- user = ConnectContext.get().getQualifiedUser();
- }
-
- public String getComment() {
- return comment;
- }
-
- @Override
- public boolean needAuditEncryption() {
- if (brokerDesc != null || resourceDesc != null) {
- return true;
- }
- return false;
- }
-
- public void setEtlJobType(EtlJobType etlJobType) {
- this.etlJobType = etlJobType;
- }
-
- @Override
- public String toSql() {
- StringBuilder sb = new StringBuilder();
- sb.append("LOAD LABEL ").append(label.toSql()).append("\n");
- sb.append("(");
- Joiner.on(",\n").appendTo(sb, Lists.transform(dataDescriptions, new
Function<DataDescription, Object>() {
- @Override
- public Object apply(DataDescription dataDescription) {
- return dataDescription.toSql();
- }
- })).append(")");
- if (brokerDesc != null) {
- sb.append("\n").append(brokerDesc.toSql());
- }
- if (resourceDesc != null) {
- sb.append("\n").append(resourceDesc.toSql());
- }
-
- if (properties != null && !properties.isEmpty()) {
- sb.append("\nPROPERTIES (");
- sb.append(new PrintableMap<>(properties, "=", true, false, true));
- sb.append(")");
- }
- return sb.toString();
- }
-
- @Override
- public String toString() {
- return toSql();
- }
-
- public RedirectStatus getRedirectStatus() {
- if (isMysqlLoad) {
- return RedirectStatus.NO_FORWARD;
- } else {
- return RedirectStatus.FORWARD_WITH_SYNC;
- }
- }
-
- private void checkEndpoint(String endpoint) throws UserException {
- HttpURLConnection connection = null;
- try {
- String urlStr = endpoint;
- // Add default protocol if not specified
- if (!endpoint.startsWith("http://") &&
!endpoint.startsWith("https://")) {
- urlStr = "http://" + endpoint;
- }
- SecurityChecker.getInstance().startSSRFChecking(urlStr);
- URL url = new URL(urlStr);
- connection = (HttpURLConnection) url.openConnection();
- connection.setConnectTimeout(10000);
- connection.connect();
- } catch (Exception e) {
- LOG.warn("Failed to connect endpoint={}, err={}", endpoint, e);
- String msg;
- if (e instanceof UserException) {
- msg = ((UserException) e).getDetailMessage();
- } else {
- msg = e.getMessage();
- }
- throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
- "Failed to access object storage, message=" + msg, e);
- } finally {
- if (connection != null) {
- try {
- connection.disconnect();
- } catch (Exception e) {
- LOG.warn("Failed to disconnect connection, endpoint={},
err={}", endpoint, e);
- }
- }
- SecurityChecker.getInstance().stopSSRFChecking();
- }
- }
-
- public void checkS3Param() throws UserException {
- if (brokerDesc.getFileType() != null &&
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
-
- ObjectStorageProperties storageProperties =
(ObjectStorageProperties) brokerDesc.getStorageProperties();
- String endpoint = storageProperties.getEndpoint();
- checkEndpoint(endpoint);
- checkWhiteList(endpoint);
- List<String> filePaths = new ArrayList<>();
- if (dataDescriptions != null && !dataDescriptions.isEmpty()) {
- for (DataDescription dataDescription : dataDescriptions) {
- if (dataDescription.getFilePaths() != null) {
- for (String filePath : dataDescription.getFilePaths())
{
- if (filePath != null && !filePath.isEmpty()) {
- filePaths.add(filePath);
- }
- }
- }
- }
- }
- }
- }
-
- public void checkWhiteList(String endpoint) throws UserException {
- endpoint = endpoint.replaceFirst("^http://", "");
- endpoint = endpoint.replaceFirst("^https://", "");
- List<String> whiteList = new
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
- whiteList.removeIf(String::isEmpty);
- if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
- throw new UserException("endpoint: " + endpoint
- + " is not in s3 load endpoint white list: " +
String.join(",", whiteList));
- }
- }
-
- @Override
- public StmtType stmtType() {
- return StmtType.LOAD;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index 6fd266c15a7..d63426cec54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -17,7 +17,6 @@
package org.apache.doris.cloud.load;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -29,7 +28,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
-import org.apache.doris.common.UserException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.BrokerLoadJob;
import org.apache.doris.load.loadv2.JobState;
@@ -71,13 +69,6 @@ public class CloudLoadManager extends LoadManager {
this.cleanCopyJobScheduler = cleanCopyJobScheduler;
}
- @Override
- public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException,
UserException {
- ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
-
- return super.createLoadJobFromStmt(stmt);
- }
-
public LoadJob createLoadJobFromCopyIntoCommand(CopyIntoCommand command)
throws DdlException {
CopyIntoInfo copyIntoInfo = command.getCopyIntoInfo();
Database database = super.checkDb(copyIntoInfo.getDbName());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index f87eb29d9d8..dd0e0d3a228 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -17,7 +17,6 @@
package org.apache.doris.job.extensions.insert;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Column;
@@ -45,6 +44,7 @@ import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -530,16 +530,16 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
private String getPriority() {
- return properties.getOrDefault(LoadStmt.PRIORITY,
Priority.NORMAL.name());
+ return properties.getOrDefault(LoadCommand.PRIORITY,
Priority.NORMAL.name());
}
public double getMaxFilterRatio() {
- return
Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY,
"0.0"));
+ return
Double.parseDouble(properties.getOrDefault(LoadCommand.MAX_FILTER_RATIO_PROPERTY,
"0.0"));
}
public long getTimeout() {
- if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
- return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
+ if (properties.containsKey(LoadCommand.TIMEOUT_PROPERTY)) {
+ return
Long.parseLong(properties.get(LoadCommand.TIMEOUT_PROPERTY));
}
return Config.broker_load_default_timeout_second;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index 53c201fcea4..5c0ef0516d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -19,7 +19,6 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
@@ -148,37 +147,6 @@ public abstract class BulkLoadJob extends LoadJob
implements GsonPostProcessable
}
}
- public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
- // get db id
- String dbName = stmt.getLabel().getDbName();
- Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-
- // create job
- BulkLoadJob bulkLoadJob;
- try {
- switch (stmt.getEtlJobType()) {
- case BROKER:
- bulkLoadJob =
EnvFactory.getInstance().createBrokerLoadJob(db.getId(),
- stmt.getLabel().getLabelName(),
stmt.getBrokerDesc(), stmt.getOrigStmt(),
- stmt.getUserInfo());
- break;
- case DELETE:
- case INSERT:
- throw new DdlException("LoadManager only support create
broker load job from stmt.");
- default:
- throw new DdlException("Unknown load job type.");
- }
- bulkLoadJob.setComment(stmt.getComment());
- bulkLoadJob.setJobProperties(stmt.getProperties());
- bulkLoadJob.checkAndSetDataSourceInfo((Database) db,
stmt.getDataDescriptions());
- // In the construction method, there may not be table information
yet
- bulkLoadJob.rebuildAuthorizationInfo();
- return bulkLoadJob;
- } catch (MetaNotFoundException e) {
- throw new DdlException(e.getMessage());
- }
- }
-
public void checkAndSetDataSourceInfoByNereids(Database db,
List<NereidsDataDescription> dataDescriptions,
ConnectContext ctx)
throws Exception {
// check data source info
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index bdc5f09b610..81d2de1ef4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -17,7 +17,6 @@
package org.apache.doris.load.loadv2;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
@@ -47,6 +46,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -281,9 +281,10 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback
// set property from session variables
if (ConnectContext.get() != null) {
- jobProperties.put(LoadStmt.EXEC_MEM_LIMIT,
ConnectContext.get().getSessionVariable().getMaxExecMemByte());
- jobProperties.put(LoadStmt.TIMEZONE,
ConnectContext.get().getSessionVariable().getTimeZone());
- jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM,
+ jobProperties.put(LoadCommand.EXEC_MEM_LIMIT,
+
ConnectContext.get().getSessionVariable().getMaxExecMemByte());
+ jobProperties.put(LoadCommand.TIMEZONE,
ConnectContext.get().getSessionVariable().getTimeZone());
+ jobProperties.put(LoadCommand.SEND_BATCH_PARALLELISM,
ConnectContext.get().getSessionVariable().getSendBatchParallelism());
}
@@ -292,12 +293,12 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback
}
// set property from specified job properties
- for (String key : LoadStmt.PROPERTIES_MAP.keySet()) {
+ for (String key : LoadCommand.PROPERTIES_MAP.keySet()) {
if (!properties.containsKey(key)) {
continue;
}
try {
- jobProperties.put(key,
LoadStmt.PROPERTIES_MAP.get(key).apply(properties.get(key)));
+ jobProperties.put(key,
LoadCommand.PROPERTIES_MAP.get(key).apply(properties.get(key)));
} catch (Exception e) {
throw new DdlException("Failed to set property " + key + ".
Error: " + e.getMessage());
}
@@ -330,17 +331,17 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback
default:
break;
}
- jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
- jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 2 * 1024 * 1024 * 1024L);
- jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, 0.0);
- jobProperties.put(LoadStmt.STRICT_MODE, false);
- jobProperties.put(LoadStmt.PARTIAL_COLUMNS, false);
- jobProperties.put(LoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY,
TPartialUpdateNewRowPolicy.APPEND);
- jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE);
- jobProperties.put(LoadStmt.LOAD_PARALLELISM,
Config.default_load_parallelism);
- jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1);
- jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, false);
- jobProperties.put(LoadStmt.PRIORITY, LoadTask.Priority.NORMAL);
+ jobProperties.put(LoadCommand.TIMEOUT_PROPERTY, timeout);
+ jobProperties.put(LoadCommand.EXEC_MEM_LIMIT, 2 * 1024 * 1024 * 1024L);
+ jobProperties.put(LoadCommand.MAX_FILTER_RATIO_PROPERTY, 0.0);
+ jobProperties.put(LoadCommand.STRICT_MODE, false);
+ jobProperties.put(LoadCommand.PARTIAL_COLUMNS, false);
+ jobProperties.put(LoadCommand.PARTIAL_UPDATE_NEW_KEY_POLICY,
TPartialUpdateNewRowPolicy.APPEND);
+ jobProperties.put(LoadCommand.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE);
+ jobProperties.put(LoadCommand.LOAD_PARALLELISM,
Config.default_load_parallelism);
+ jobProperties.put(LoadCommand.SEND_BATCH_PARALLELISM, 1);
+ jobProperties.put(LoadCommand.LOAD_TO_SINGLE_TABLET, false);
+ jobProperties.put(LoadCommand.PRIORITY, LoadTask.Priority.NORMAL);
}
public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException,
@@ -1064,63 +1065,63 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback
// unit: second
public long getTimeout() {
- return (long) jobProperties.get(LoadStmt.TIMEOUT_PROPERTY);
+ return (long) jobProperties.get(LoadCommand.TIMEOUT_PROPERTY);
}
public void setTimeout(long timeout) {
- jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
+ jobProperties.put(LoadCommand.TIMEOUT_PROPERTY, timeout);
}
protected long getExecMemLimit() {
- return (long) jobProperties.get(LoadStmt.EXEC_MEM_LIMIT);
+ return (long) jobProperties.get(LoadCommand.EXEC_MEM_LIMIT);
}
public double getMaxFilterRatio() {
- return (double) jobProperties.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
+ return (double)
jobProperties.get(LoadCommand.MAX_FILTER_RATIO_PROPERTY);
}
protected void setMaxFilterRatio(double maxFilterRatio) {
- jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio);
+ jobProperties.put(LoadCommand.MAX_FILTER_RATIO_PROPERTY,
maxFilterRatio);
}
protected boolean isStrictMode() {
- return (boolean) jobProperties.get(LoadStmt.STRICT_MODE);
+ return (boolean) jobProperties.get(LoadCommand.STRICT_MODE);
}
protected boolean isPartialUpdate() {
- return (boolean) jobProperties.get(LoadStmt.PARTIAL_COLUMNS);
+ return (boolean) jobProperties.get(LoadCommand.PARTIAL_COLUMNS);
}
protected TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() {
- return (TPartialUpdateNewRowPolicy)
jobProperties.get(LoadStmt.PARTIAL_UPDATE_NEW_KEY_POLICY);
+ return (TPartialUpdateNewRowPolicy)
jobProperties.get(LoadCommand.PARTIAL_UPDATE_NEW_KEY_POLICY);
}
protected String getTimeZone() {
- return (String) jobProperties.get(LoadStmt.TIMEZONE);
+ return (String) jobProperties.get(LoadCommand.TIMEZONE);
}
public int getLoadParallelism() {
- if (jobProperties.get(LoadStmt.LOAD_PARALLELISM).getClass() ==
Integer.class) {
- return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM);
+ if (jobProperties.get(LoadCommand.LOAD_PARALLELISM).getClass() ==
Integer.class) {
+ return (int) jobProperties.get(LoadCommand.LOAD_PARALLELISM);
} else {
- return ((Long)
jobProperties.get(LoadStmt.LOAD_PARALLELISM)).intValue();
+ return ((Long)
jobProperties.get(LoadCommand.LOAD_PARALLELISM)).intValue();
}
}
public int getSendBatchParallelism() {
- if (jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM).getClass() ==
Integer.class) {
- return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM);
+ if (jobProperties.get(LoadCommand.SEND_BATCH_PARALLELISM).getClass()
== Integer.class) {
+ return (int) jobProperties.get(LoadCommand.SEND_BATCH_PARALLELISM);
} else {
- return ((Long)
jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM)).intValue();
+ return ((Long)
jobProperties.get(LoadCommand.SEND_BATCH_PARALLELISM)).intValue();
}
}
public LoadTask.Priority getPriority() {
- return (LoadTask.Priority) jobProperties.get(LoadStmt.PRIORITY);
+ return (LoadTask.Priority) jobProperties.get(LoadCommand.PRIORITY);
}
public boolean isSingleTabletLoadPerSink() {
- return (boolean) jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET);
+ return (boolean) jobProperties.get(LoadCommand.LOAD_TO_SINGLE_TABLET);
}
// Return true if this job is finished for a long time
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index bb74a6f9f50..1bc3040f9b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -17,7 +17,6 @@
package org.apache.doris.load.loadv2;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -161,58 +160,6 @@ public class LoadManager implements Writable {
return loadJob.getId();
}
- /**
- * This method will be invoked by the broker load(v2) now.
- */
- public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException,
UserException {
- List<TPipelineWorkloadGroup> twgList = null;
- if (Config.enable_workload_group) {
- try {
- twgList =
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get())
- .stream()
- .map(e -> e.toThrift())
- .collect(Collectors.toList());
- } catch (Throwable t) {
- LOG.info("Get workload group failed when create load job,", t);
- throw t;
- }
- }
-
- Database database = checkDb(stmt.getLabel().getDbName());
- long dbId = database.getId();
- LoadJob loadJob;
- writeLock();
- try {
- checkLabelUsed(dbId, stmt.getLabel().getLabelName());
- if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() ==
null) {
- throw new DdlException("LoadManager only support the broker
and spark load.");
- }
- if (unprotectedGetUnfinishedJobNum() >=
Config.desired_max_waiting_jobs) {
- throw new DdlException(
- "There are more than " +
Config.desired_max_waiting_jobs
- + " unfinished load jobs, please retry later. "
- + "You can use `SHOW LOAD` to view submitted
jobs");
- }
-
- loadJob = BulkLoadJob.fromLoadStmt(stmt);
-
- if (twgList != null) {
- loadJob.settWorkloadGroups(twgList);
- }
-
- createLoadJob(loadJob);
- } finally {
- writeUnlock();
- }
-
- Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
-
- // The job must be submitted after edit log.
- // It guarantees that load job has not been changed before edit log.
- loadJobScheduler.submitJob(loadJob);
- return loadJob.getId();
- }
-
private long unprotectedGetUnfinishedJobNum() {
return idToLoadJob.values().stream()
.filter(j -> (j.getState() != JobState.FINISHED &&
j.getState() != JobState.CANCELLED)).count();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index f00894b8ac7..9ca03949884 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -19,7 +19,6 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.UserIdentity;
@@ -38,6 +37,7 @@ import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.nereids.load.NereidsDataDescription;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.load.MysqlDataDescription;
import org.apache.doris.nereids.trees.plans.commands.load.MysqlLoadCommand;
import org.apache.doris.qe.ConnectContext;
@@ -325,8 +325,8 @@ public class MysqlLoadManager {
}
private int extractTimeOut(NereidsDataDescription desc) {
- if (desc.getProperties() != null &&
desc.getProperties().containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
- return
Integer.parseInt(desc.getProperties().get(LoadStmt.TIMEOUT_PROPERTY));
+ if (desc.getProperties() != null &&
desc.getProperties().containsKey(LoadCommand.TIMEOUT_PROPERTY)) {
+ return
Integer.parseInt(desc.getProperties().get(LoadCommand.TIMEOUT_PROPERTY));
}
return -1;
}
@@ -339,8 +339,8 @@ public class MysqlLoadManager {
}
private int extractTimeOut(DataDescription desc) {
- if (desc.getProperties() != null &&
desc.getProperties().containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
- return
Integer.parseInt(desc.getProperties().get(LoadStmt.TIMEOUT_PROPERTY));
+ if (desc.getProperties() != null &&
desc.getProperties().containsKey(LoadCommand.TIMEOUT_PROPERTY)) {
+ return
Integer.parseInt(desc.getProperties().get(LoadCommand.TIMEOUT_PROPERTY));
}
return -1;
}
@@ -481,33 +481,33 @@ public class MysqlLoadManager {
CsvFileFormatProperties csvFileFormatProperties =
(CsvFileFormatProperties) fileFormatProperties;
if (props != null) {
// max_filter_ratio
- if (props.containsKey(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO)) {
- String maxFilterRatio =
props.get(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO);
- httpPut.addHeader(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO,
maxFilterRatio);
+ if (props.containsKey(LoadCommand.KEY_IN_PARAM_MAX_FILTER_RATIO)) {
+ String maxFilterRatio =
props.get(LoadCommand.KEY_IN_PARAM_MAX_FILTER_RATIO);
+ httpPut.addHeader(LoadCommand.KEY_IN_PARAM_MAX_FILTER_RATIO,
maxFilterRatio);
}
// exec_mem_limit
- if (props.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
- String memory = props.get(LoadStmt.EXEC_MEM_LIMIT);
- httpPut.addHeader(LoadStmt.EXEC_MEM_LIMIT, memory);
+ if (props.containsKey(LoadCommand.EXEC_MEM_LIMIT)) {
+ String memory = props.get(LoadCommand.EXEC_MEM_LIMIT);
+ httpPut.addHeader(LoadCommand.EXEC_MEM_LIMIT, memory);
}
// strict_mode
- if (props.containsKey(LoadStmt.STRICT_MODE)) {
- String strictMode = props.get(LoadStmt.STRICT_MODE);
- httpPut.addHeader(LoadStmt.STRICT_MODE, strictMode);
+ if (props.containsKey(LoadCommand.STRICT_MODE)) {
+ String strictMode = props.get(LoadCommand.STRICT_MODE);
+ httpPut.addHeader(LoadCommand.STRICT_MODE, strictMode);
}
// timeout
- if (props.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
- String timeout = props.get(LoadStmt.TIMEOUT_PROPERTY);
- httpPut.addHeader(LoadStmt.TIMEOUT_PROPERTY, timeout);
+ if (props.containsKey(LoadCommand.TIMEOUT_PROPERTY)) {
+ String timeout = props.get(LoadCommand.TIMEOUT_PROPERTY);
+ httpPut.addHeader(LoadCommand.TIMEOUT_PROPERTY, timeout);
}
// timezone
- if (props.containsKey(LoadStmt.TIMEZONE)) {
- String timezone = props.get(LoadStmt.TIMEZONE);
- httpPut.addHeader(LoadStmt.TIMEZONE, timezone);
+ if (props.containsKey(LoadCommand.TIMEZONE)) {
+ String timezone = props.get(LoadCommand.TIMEZONE);
+ httpPut.addHeader(LoadCommand.TIMEZONE, timezone);
}
}
@@ -525,7 +525,7 @@ public class MysqlLoadManager {
// columns
String columns = getColumns(desc);
if (columns != null) {
- httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMNS, columns);
+ httpPut.addHeader(LoadCommand.KEY_IN_PARAM_COLUMNS, columns);
}
// partitions
@@ -533,9 +533,9 @@ public class MysqlLoadManager {
List<String> ps = desc.getPartitionNames().getPartitionNames();
String pNames = Joiner.on(",").join(ps);
if (desc.getPartitionNames().isTemp()) {
- httpPut.addHeader(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS,
pNames);
+ httpPut.addHeader(LoadCommand.KEY_IN_PARAM_TEMP_PARTITIONS,
pNames);
} else {
- httpPut.addHeader(LoadStmt.KEY_IN_PARAM_PARTITIONS, pNames);
+ httpPut.addHeader(LoadCommand.KEY_IN_PARAM_PARTITIONS, pNames);
}
}
@@ -551,7 +551,7 @@ public class MysqlLoadManager {
if (Strings.isNullOrEmpty(clusterName)) {
throw new LoadException("cloud compute group is empty");
}
- httpPut.addHeader(LoadStmt.KEY_CLOUD_CLUSTER, clusterName);
+ httpPut.addHeader(LoadCommand.KEY_CLOUD_CLUSTER, clusterName);
}
httpPut.setEntity(entity);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 3717b377ff2..ba95a1306e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -19,7 +19,6 @@ package org.apache.doris.load.routineload;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnsStmt;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.UserIdentity;
@@ -54,6 +53,7 @@ import
org.apache.doris.nereids.load.NereidsRoutineLoadTaskInfo;
import org.apache.doris.nereids.load.NereidsStreamLoadPlanner;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import
org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
@@ -619,7 +619,7 @@ public abstract class RoutineLoadJob
}
public boolean isStrictMode() {
- String value = jobProperties.get(LoadStmt.STRICT_MODE);
+ String value = jobProperties.get(LoadCommand.STRICT_MODE);
if (value == null) {
return DEFAULT_STRICT_MODE;
}
@@ -650,7 +650,7 @@ public abstract class RoutineLoadJob
}
public String getTimezone() {
- String value = jobProperties.get(LoadStmt.TIMEZONE);
+ String value = jobProperties.get(LoadCommand.TIMEZONE);
if (value == null) {
return TimeUtils.DEFAULT_TIME_ZONE;
}
@@ -1780,9 +1780,9 @@ public abstract class RoutineLoadJob
appendProperties(sb, JsonFileFormatProperties.PROP_NUM_AS_STRING,
isNumAsString(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_FUZZY_PARSE,
isFuzzyParse(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_JSON_ROOT,
getJsonRoot(), false);
- appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
- appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
- appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
+ appendProperties(sb, LoadCommand.STRICT_MODE, isStrictMode(), false);
+ appendProperties(sb, LoadCommand.TIMEZONE, getTimezone(), false);
+ appendProperties(sb, LoadCommand.EXEC_MEM_LIMIT, getMemLimit(), true);
sb.append(")\n");
// 6. data_source
sb.append("FROM ").append(dataSourceType).append("\n");
@@ -1857,14 +1857,14 @@ public abstract class RoutineLoadJob
if (getFormat().equalsIgnoreCase("json")) {
jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
} else {
- jobProperties.put(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR,
+ jobProperties.put(LoadCommand.KEY_IN_PARAM_COLUMN_SEPARATOR,
columnSeparator == null ? "\t" :
columnSeparator.toString());
- jobProperties.put(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER,
+ jobProperties.put(LoadCommand.KEY_IN_PARAM_LINE_DELIMITER,
lineDelimiter == null ? "\n" : lineDelimiter.toString());
}
- jobProperties.put(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION,
+ jobProperties.put(LoadCommand.KEY_IN_PARAM_DELETE_CONDITION,
deleteCondition == null ? STAR_STRING :
deleteCondition.toSqlWithoutTbl());
- jobProperties.put(LoadStmt.KEY_IN_PARAM_SEQUENCE_COL,
+ jobProperties.put(LoadCommand.KEY_IN_PARAM_SEQUENCE_COL,
sequenceCol == null ? STAR_STRING : sequenceCol);
// job properties defined in CreateRoutineLoadStmt
@@ -1877,8 +1877,8 @@ public abstract class RoutineLoadJob
String.valueOf(currentTaskConcurrentNum));
jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY,
String.valueOf(desireTaskConcurrentNum));
- jobProperties.put(LoadStmt.EXEC_MEM_LIMIT,
String.valueOf(execMemLimit));
- jobProperties.put(LoadStmt.KEY_IN_PARAM_MERGE_TYPE,
mergeType.toString());
+ jobProperties.put(LoadCommand.EXEC_MEM_LIMIT,
String.valueOf(execMemLimit));
+ jobProperties.put(LoadCommand.KEY_IN_PARAM_MERGE_TYPE,
mergeType.toString());
jobProperties.putAll(this.jobProperties);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(jobProperties);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
index a9ecf76523c..8fac34ecf28 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java
@@ -17,7 +17,6 @@
package org.apache.doris.nereids.load;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.common.Config;
@@ -25,6 +24,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@@ -123,7 +123,7 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
@Override
public String getTimezone() {
- String value = jobProperties.get(LoadStmt.TIMEZONE);
+ String value = jobProperties.get(LoadCommand.TIMEZONE);
if (value == null) {
return TimeUtils.DEFAULT_TIME_ZONE;
}
@@ -232,7 +232,7 @@ public class NereidsRoutineLoadTaskInfo implements
NereidsLoadTaskInfo {
@Override
public boolean isStrictMode() {
- String value = jobProperties.get(LoadStmt.STRICT_MODE);
+ String value = jobProperties.get(LoadCommand.STRICT_MODE);
if (value == null) {
return DEFAULT_STRICT_MODE;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index debf6126a99..1de29d7c1f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.StmtType;
@@ -84,7 +83,7 @@ public class ExportCommand extends Command implements
NeedAuditEncryption, Forwa
.add(LABEL)
.add(PARALLELISM)
.add(DATA_CONSISTENCY)
- .add(LoadStmt.KEY_IN_PARAM_COLUMNS)
+ .add(LoadCommand.KEY_IN_PARAM_COLUMNS)
.add(OutFileClause.PROP_MAX_FILE_SIZE)
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
@@ -284,7 +283,7 @@ public class ExportCommand extends Command implements
NeedAuditEncryption, Forwa
exportJob.setLineDelimiter(lineDelimiter);
// set format
-
exportJob.setFormat(fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE,
"csv")
+
exportJob.setFormat(fileProperties.getOrDefault(LoadCommand.KEY_IN_PARAM_FORMAT_TYPE,
"csv")
.toLowerCase());
// set withBom
@@ -313,7 +312,7 @@ public class ExportCommand extends Command implements
NeedAuditEncryption, Forwa
// null means not specified
// "" means user specified zero columns
// if fileProperties contains KEY_IN_PARAM_COLUMNS, the columns have
been checked in check phases
- String columns =
fileProperties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, null);
+ String columns =
fileProperties.getOrDefault(LoadCommand.KEY_IN_PARAM_COLUMNS, null);
exportJob.setColumns(columns);
if (columns != null) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index eb73cc8ec4f..6905191a0f6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -19,7 +19,6 @@ package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.analysis.UserIdentity;
@@ -360,7 +359,7 @@ public class LoadCommand extends Command implements
NeedAuditEncryption, Forward
final String timezone = properties.get(TIMEZONE);
if (timezone != null) {
properties.put(TIMEZONE,
TimeUtils.checkTimeZoneValidAndStandardize(
- properties.getOrDefault(LoadStmt.TIMEZONE,
TimeUtils.DEFAULT_TIME_ZONE)));
+ properties.getOrDefault(TIMEZONE,
TimeUtils.DEFAULT_TIME_ZONE)));
}
// send batch parallelism
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
index a77266b20fd..eb6c47f00f8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
@@ -24,7 +24,6 @@ import org.apache.doris.analysis.CopyIntoProperties;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StageAndPattern;
@@ -68,6 +67,7 @@ import
org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
@@ -289,7 +289,7 @@ public class CopyIntoInfo {
try {
properties.putAll(copyIntoProperties.getExecProperties());
// TODO support exec params as LoadStmt
- LoadStmt.checkProperties(properties);
+ LoadCommand.checkProperties(properties);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 52366df3a3e..5ec71bcb43b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -17,22 +17,16 @@
package org.apache.doris.load.loadv2;
-import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.DataDescription;
-import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
-import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.load.NereidsLoadingTaskPlanner;
@@ -49,7 +43,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -62,122 +55,6 @@ public class BrokerLoadJobTest {
MetricRepo.init();
}
- @Test
- public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable
LabelName labelName,
- @Injectable DataDescription dataDescription,
@Mocked Env env, @Mocked InternalCatalog catalog,
- @Injectable Database database) {
- List<DataDescription> dataDescriptionList = Lists.newArrayList();
- dataDescriptionList.add(dataDescription);
-
- String tableName = "table";
- String databaseName = "database";
- new Expectations() {
- {
- loadStmt.getLabel();
- minTimes = 0;
- result = labelName;
- labelName.getDbName();
- minTimes = 0;
- result = databaseName;
- env.getInternalCatalog();
- minTimes = 0;
- result = catalog;
- catalog.getDbNullable(databaseName);
- minTimes = 0;
- result = database;
- loadStmt.getDataDescriptions();
- minTimes = 0;
- result = dataDescriptionList;
- dataDescription.getTableName();
- minTimes = 0;
- result = tableName;
- database.getTableNullable(tableName);
- minTimes = 0;
- result = null;
- }
- };
-
- try {
- BulkLoadJob.fromLoadStmt(loadStmt);
- Assert.fail();
- } catch (DdlException e) {
- System.out.println("could not find table named " + tableName);
- }
-
- }
-
- @Test
- public void testFromLoadStmt2(@Injectable LoadStmt loadStmt, @Injectable
DataDescription dataDescription,
- @Injectable LabelName labelName, @Injectable
Database database, @Injectable OlapTable olapTable,
- @Mocked Env env, @Mocked InternalCatalog
catalog) {
-
- String label = "label";
- long dbId = 1;
- String tableName = "table";
- String databaseName = "database";
- List<DataDescription> dataDescriptionList = Lists.newArrayList();
- dataDescriptionList.add(dataDescription);
- BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
- Map<String, String> properties = new HashMap<>();
- properties.put(LoadStmt.PRIORITY, "HIGH");
-
- new Expectations() {
- {
- loadStmt.getLabel();
- minTimes = 0;
- result = labelName;
- labelName.getDbName();
- minTimes = 0;
- result = databaseName;
- labelName.getLabelName();
- minTimes = 0;
- result = label;
- env.getInternalCatalog();
- minTimes = 0;
- result = catalog;
- catalog.getDbNullable(databaseName);
- minTimes = 0;
- result = database;
- loadStmt.getDataDescriptions();
- minTimes = 0;
- result = dataDescriptionList;
- dataDescription.getTableName();
- minTimes = 0;
- result = tableName;
- database.getTableNullable(tableName);
- minTimes = 0;
- result = olapTable;
- dataDescription.getPartitionNames();
- minTimes = 0;
- result = null;
- database.getId();
- minTimes = 0;
- result = dbId;
- loadStmt.getBrokerDesc();
- minTimes = 0;
- result = brokerDesc;
- loadStmt.getEtlJobType();
- minTimes = 0;
- result = EtlJobType.BROKER;
- loadStmt.getProperties();
- minTimes = 0;
- result = properties;
- }
- };
-
- try {
- BrokerLoadJob brokerLoadJob = (BrokerLoadJob)
BulkLoadJob.fromLoadStmt(loadStmt);
- Assert.assertEquals(Long.valueOf(dbId),
Deencapsulation.getField(brokerLoadJob, "dbId"));
- Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob,
"label"));
- Assert.assertEquals(JobState.PENDING,
Deencapsulation.getField(brokerLoadJob, "state"));
- Assert.assertEquals(EtlJobType.BROKER,
Deencapsulation.getField(brokerLoadJob, "jobType"));
- Assert.assertEquals(brokerLoadJob.getPriority(),
LoadTask.Priority.HIGH);
- } catch (DdlException e) {
- Assert.fail(e.getMessage());
- }
-
- }
-
@Test
public void testGetTableNames(@Injectable BrokerFileGroupAggInfo
fileGroupAggInfo,
@Injectable BrokerFileGroup brokerFileGroup,
@Mocked Env env, @Mocked InternalCatalog catalog,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index c14f0837366..92a39ba5f48 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -17,7 +17,6 @@
package org.apache.doris.load.loadv2;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@@ -29,6 +28,7 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.metric.LongCounterMetric;
import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.persist.EditLog;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.TUniqueId;
@@ -57,7 +57,7 @@ public class LoadJobTest {
@Test
public void testSetJobPropertiesWithErrorTimeout() {
Map<String, String> jobProperties = Maps.newHashMap();
- jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, "abc");
+ jobProperties.put(LoadCommand.TIMEOUT_PROPERTY, "abc");
LoadJob loadJob = new BrokerLoadJob();
try {
loadJob.setJobProperties(jobProperties);
@@ -70,10 +70,10 @@ public class LoadJobTest {
@Test
public void testSetJobProperties() {
Map<String, String> jobProperties = Maps.newHashMap();
- jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, "1000");
- jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, "0.1");
- jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, "1024");
- jobProperties.put(LoadStmt.STRICT_MODE, "True");
+ jobProperties.put(LoadCommand.TIMEOUT_PROPERTY, "1000");
+ jobProperties.put(LoadCommand.MAX_FILTER_RATIO_PROPERTY, "0.1");
+ jobProperties.put(LoadCommand.EXEC_MEM_LIMIT, "1024");
+ jobProperties.put(LoadCommand.STRICT_MODE, "True");
LoadJob loadJob = new BrokerLoadJob();
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java
index 55e9185e490..bf7477fd74d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java
@@ -18,7 +18,6 @@
package org.apache.doris.persist;
import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -27,6 +26,7 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.qe.OriginStatement;
import com.google.common.collect.Maps;
@@ -51,7 +51,7 @@ public class LoadJobV2PersistTest {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob(1L, "label",
brokerDesc, originStatement,
UserIdentity.ADMIN);
Map<String, String> jobProperties = Maps.newHashMap();
- jobProperties.put(LoadStmt.LOAD_PARALLELISM, "5");
+ jobProperties.put(LoadCommand.LOAD_PARALLELISM, "5");
brokerLoadJob.setJobProperties(jobProperties);
return brokerLoadJob;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]