lsy3993 commented on code in PR #53013:
URL: https://github.com/apache/doris/pull/53013#discussion_r2196993687
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java:
##########
@@ -17,485 +17,608 @@
package org.apache.doris.nereids.trees.plans.commands;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.LabelName;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.StmtType;
-import org.apache.doris.catalog.Column;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.TableIf;
+import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.NereidsException;
-import org.apache.doris.common.profile.Profile;
-import org.apache.doris.common.util.FileFormatConstants;
-import org.apache.doris.common.util.FileFormatUtils;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.job.base.JobExecuteType;
-import org.apache.doris.job.base.JobExecutionConfiguration;
-import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.LoadJobRowResult;
+import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.loadv2.LoadTask;
-import org.apache.doris.nereids.analyzer.UnboundAlias;
-import org.apache.doris.nereids.analyzer.UnboundSlot;
-import org.apache.doris.nereids.analyzer.UnboundStar;
-import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
-import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
-import org.apache.doris.nereids.exceptions.MustFallbackException;
-import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.NamedExpression;
-import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
-import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
-import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.load.NereidsDataDescription;
import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
-import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
-import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
-import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
-import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.nereids.util.ExpressionUtils;
-import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.tablefunction.HdfsTableValuedFunction;
-import org.apache.doris.tablefunction.S3TableValuedFunction;
+import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.UUID;
/**
* load OLAP table data from external bulk file
*/
public class LoadCommand extends Command implements NeedAuditEncryption,
ForwardWithSync {
+ public static final String TIMEOUT_PROPERTY = "timeout";
+ public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
+ public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
+ public static final String CLUSTER_PROPERTY = "cluster";
+ public static final String STRICT_MODE = "strict_mode";
+ public static final String TIMEZONE = "timezone";
+ public static final String LOAD_PARALLELISM = "load_parallelism";
+ public static final String SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
+ public static final String PRIORITY = "priority";
+ public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
+
+ // deprecated, keeping this property to make LoadStmt#checkProperties()
happy
+ public static final String USE_NEW_LOAD_SCAN_NODE =
"use_new_load_scan_node";
+
+ // for load data from Baidu Object Store(BOS) todo wait new property
support
+ public static final String BOS_ENDPOINT = "bos_endpoint";
+ public static final String BOS_ACCESSKEY = "bos_accesskey";
+ public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";
+
+ // mini load params
+ public static final String KEY_IN_PARAM_COLUMNS = "columns";
+ public static final String KEY_IN_PARAM_SET = "set";
+ public static final String KEY_IN_PARAM_HLL = "hll";
+ public static final String KEY_IN_PARAM_COLUMN_SEPARATOR =
"column_separator";
+ public static final String KEY_IN_PARAM_LINE_DELIMITER = "line_delimiter";
+ public static final String KEY_IN_PARAM_PARTITIONS = "partitions";
+ public static final String KEY_IN_PARAM_FORMAT_TYPE = "format";
+
+ public static final String KEY_IN_PARAM_WHERE = "where";
+ public static final String KEY_IN_PARAM_MAX_FILTER_RATIO =
"max_filter_ratio";
+ public static final String KEY_IN_PARAM_TIMEOUT = "timeout";
+ public static final String KEY_IN_PARAM_TEMP_PARTITIONS =
"temporary_partitions";
+ public static final String KEY_IN_PARAM_NEGATIVE = "negative";
+ public static final String KEY_IN_PARAM_STRICT_MODE = "strict_mode";
+ public static final String KEY_IN_PARAM_TIMEZONE = "timezone";
+ public static final String KEY_IN_PARAM_EXEC_MEM_LIMIT = "exec_mem_limit";
+ public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths";
+ public static final String KEY_IN_PARAM_JSONROOT = "json_root";
+ public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY =
"strip_outer_array";
+ public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse";
+ public static final String KEY_IN_PARAM_NUM_AS_STRING = "num_as_string";
+ public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type";
+ public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete";
+ public static final String KEY_IN_PARAM_FUNCTION_COLUMN =
"function_column";
+ public static final String KEY_IN_PARAM_SEQUENCE_COL = "sequence_col";
+ public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id";
+ public static final String KEY_SKIP_LINES = "skip_lines";
+ public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+ public static final String PARTIAL_COLUMNS = "partial_columns";
+ public static final String PARTIAL_UPDATE_NEW_KEY_POLICY =
"partial_update_new_key_behavior";
+ public static final String KEY_COMMENT = "comment";
+ public static final String KEY_CLOUD_CLUSTER = "cloud_cluster";
+ public static final String KEY_ENCLOSE = "enclose";
+ public static final String KEY_ESCAPE = "escape";
+ public static final ImmutableMap<String, Function> PROPERTIES_MAP = new
ImmutableMap.Builder<String, Function>()
+ .put(TIMEOUT_PROPERTY, new Function<String, Long>() {
+ @Override
+ public @Nullable Long apply(@Nullable String s) {
+ return Long.valueOf(s);
+ }
+ })
+ .put(MAX_FILTER_RATIO_PROPERTY, new Function<String, Double>() {
+ @Override
+ public @Nullable Double apply(@Nullable String s) {
+ return Double.valueOf(s);
+ }
+ })
+ .put(EXEC_MEM_LIMIT, new Function<String, Long>() {
+ @Override
+ public @Nullable Long apply(@Nullable String s) {
+ return Long.valueOf(s);
+ }
+ })
+ .put(STRICT_MODE, new Function<String, Boolean>() {
+ @Override
+ public @Nullable Boolean apply(@Nullable String s) {
+ return Boolean.valueOf(s);
+ }
+ })
+ .put(PARTIAL_COLUMNS, new Function<String, Boolean>() {
+ @Override
+ public @Nullable Boolean apply(@Nullable String s) {
+ return Boolean.valueOf(s);
+ }
+ })
+ .put(PARTIAL_UPDATE_NEW_KEY_POLICY, new Function<String,
TPartialUpdateNewRowPolicy>() {
+ @Override
+ public @Nullable TPartialUpdateNewRowPolicy apply(@Nullable
String s) {
+ return TPartialUpdateNewRowPolicy.valueOf(s.toUpperCase());
+ }
+ })
+ .put(TIMEZONE, new Function<String, String>() {
+ @Override
+ public @Nullable String apply(@Nullable String s) {
+ return s;
+ }
+ })
+ .put(LOAD_PARALLELISM, new Function<String, Integer>() {
+ @Override
+ public @Nullable Integer apply(@Nullable String s) {
+ return Integer.valueOf(s);
+ }
+ })
+ .put(SEND_BATCH_PARALLELISM, new Function<String, Integer>() {
+ @Override
+ public @Nullable Integer apply(@Nullable String s) {
+ return Integer.valueOf(s);
+ }
+ })
+ .put(CLUSTER_PROPERTY, new Function<String, String>() {
+ @Override
+ public @Nullable String apply(@Nullable String s) {
+ return s;
+ }
+ })
+ .put(LOAD_TO_SINGLE_TABLET, new Function<String, Boolean>() {
+ @Override
+ public @Nullable Boolean apply(@Nullable String s) {
+ return Boolean.valueOf(s);
+ }
+ })
+ .put(USE_NEW_LOAD_SCAN_NODE, new Function<String, Boolean>() {
+ @Override
+ public @Nullable Boolean apply(@Nullable String s) {
+ return Boolean.valueOf(s);
+ }
+ })
+ .put(KEY_SKIP_LINES, new Function<String, Integer>() {
+ @Override
+ public @Nullable Integer apply(@Nullable String s) {
+ return Integer.valueOf(s);
+ }
+ })
+ .put(KEY_TRIM_DOUBLE_QUOTES, new Function<String, Boolean>() {
+ @Override
+ public @Nullable Boolean apply(@Nullable String s) {
+ return Boolean.valueOf(s);
+ }
+ })
+ .put(PRIORITY, (Function<String, LoadTask.Priority>) s ->
LoadTask.Priority.valueOf(s))
+ .build();
+ private static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+ private final LabelName label;
+ private final List<NereidsDataDescription> dataDescriptions;
+ private final BrokerDesc brokerDesc;
+ private final ResourceDesc resourceDesc;
+ private final Map<String, String> properties;
+ private String user;
- public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+ private boolean isMysqlLoad = false;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]