This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 678dc366e0b [fix](export) fix timeout property not work for export job
(#25913)
678dc366e0b is described below
commit 678dc366e0b2a3024647a963e11166a7feeae9c7
Author: caiconghui <[email protected]>
AuthorDate: Thu Oct 26 18:51:57 2023 +0800
[fix](export) fix timeout property not work for export job (#25913)
Co-authored-by: caiconghui1 <[email protected]>
---
.../java/org/apache/doris/analysis/ExportStmt.java | 22 +++++++++++++++++++---
.../org/apache/doris/load/ExportTaskExecutor.java | 1 +
.../trees/plans/commands/ExportCommand.java | 18 ++++++++++++++++--
3 files changed, 36 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 35e658d446d..b6ec8df488b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -58,7 +58,7 @@ import java.util.stream.Collectors;
// EXPORT TABLE table_name [PARTITION (name1[, ...])]
// TO 'export_target_path'
// [PROPERTIES("key"="value")]
-// BY BROKER 'broker_name' [( $broker_attrs)]
+// WITH BROKER 'broker_name' [( $broker_attrs)]
@Getter
public class ExportStmt extends StatementBase {
public static final String PARALLELISM = "parallelism";
@@ -67,6 +67,7 @@ public class ExportStmt extends StatementBase {
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
+ private static final Integer DEFAULT_TIMEOUT = 7200;
private static final ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
.add(LABEL)
@@ -76,6 +77,7 @@ public class ExportStmt extends StatementBase {
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
+ .add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
.add("format")
.build();
@@ -97,6 +99,8 @@ public class ExportStmt extends StatementBase {
private Integer parallelism;
+ private Integer timeout;
+
private String maxFileSize;
private String deleteExistingFiles;
private SessionVariable sessionVariables;
@@ -118,6 +122,7 @@ public class ExportStmt extends StatementBase {
this.brokerDesc = brokerDesc;
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
+ this.timeout = DEFAULT_TIMEOUT;
Optional<SessionVariable> optionalSessionVariable =
Optional.ofNullable(
ConnectContext.get().getSessionVariable());
@@ -232,8 +237,10 @@ public class ExportStmt extends StatementBase {
// set sessions
exportJob.setQualifiedUser(this.qualifiedUser);
exportJob.setUserIdentity(this.userIdentity);
- exportJob.setSessionVariables(this.sessionVariables);
- exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS());
+ SessionVariable clonedSessionVariable =
VariableMgr.cloneSessionVariable(Optional.ofNullable(
+
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
+ exportJob.setSessionVariables(clonedSessionVariable);
+ exportJob.setTimeoutSecond(this.timeout);
exportJob.setOrigStmt(this.getOrigStmt());
}
@@ -323,6 +330,15 @@ public class ExportStmt extends StatementBase {
throw new UserException("The value of parallelism is invalid!");
}
+ // timeout
+ String timeoutString =
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT,
+ String.valueOf(DEFAULT_TIMEOUT));
+ try {
+ this.timeout = Integer.parseInt(timeoutString);
+ } catch (NumberFormatException e) {
+ throw new UserException("The value of timeout is invalid!");
+ }
+
// max_file_size
this.maxFileSize =
properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
this.deleteExistingFiles =
properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index c7d7c4032ce..b647154dc11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -168,6 +168,7 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
private AutoCloseConnectContext buildConnectContext() {
ConnectContext connectContext = new ConnectContext();
+
exportJob.getSessionVariables().setQueryTimeoutS(exportJob.getTimeoutSecond());
connectContext.setSessionVariable(exportJob.getSessionVariables());
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setDatabase(exportJob.getTableName().getDb());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index 42ae03aaec4..f2cd13a4793 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -68,7 +68,7 @@ import java.util.stream.Collectors;
* EXPORT TABLE table_name [PARTITION (name1[, ...])]
* TO 'export_target_path'
* [PROPERTIES("key"="value")]
- * BY BROKER 'broker_name' [( $broker_attrs)]
+ * WITH BROKER 'broker_name' [( $broker_attrs)]
*/
public class ExportCommand extends Command implements ForwardWithSync {
public static final String PARALLELISM = "parallelism";
@@ -76,6 +76,8 @@ public class ExportCommand extends Command implements
ForwardWithSync {
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
+ private static final Integer DEFAULT_TIMEOUT = 7200;
+
private static final ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
@@ -84,6 +86,7 @@ public class ExportCommand extends Command implements
ForwardWithSync {
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
+ .add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
.add("format")
.build();
@@ -305,7 +308,18 @@ public class ExportCommand extends Command implements
ForwardWithSync {
SessionVariable clonedSessionVariable =
VariableMgr.cloneSessionVariable(Optional.ofNullable(
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
exportJob.setSessionVariables(clonedSessionVariable);
- exportJob.setTimeoutSecond(clonedSessionVariable.getQueryTimeoutS());
+
+ // set timeoutSecond
+ int timeoutSecond;
+ String timeoutString =
fileProperties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT,
+ String.valueOf(DEFAULT_TIMEOUT));
+ try {
+ timeoutSecond = Integer.parseInt(timeoutString);
+ } catch (NumberFormatException e) {
+ throw new UserException("The value of timeout is invalid!");
+ }
+
+ exportJob.setTimeoutSecond(timeoutSecond);
// exportJob generate outfile sql
exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx,
this.nameParts));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]