This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cf739e7496 [Enhancement](Stmt) Set insert_into timeout session
variable separately (#16343)
cf739e7496 is described below
commit cf739e74962c544a84208cf9c3121c219ddb392f
Author: 奕冷 <[email protected]>
AuthorDate: Sun Feb 12 16:56:10 2023 +0800
[Enhancement](Stmt) Set insert_into timeout session variable separately
(#16343)
---
be/src/exprs/runtime_filter.cpp | 4 +-
be/src/runtime/fragment_mgr.cpp | 4 +-
be/src/runtime/result_buffer_mgr.cpp | 4 +-
be/src/runtime/result_buffer_mgr.h | 2 +-
be/src/runtime/runtime_state.h | 2 +
be/src/vec/sink/vdata_stream_sender.cpp | 2 +-
be/src/vec/sink/vresult_file_sink.cpp | 2 +-
be/src/vec/sink/vresult_sink.cpp | 2 +-
be/src/vec/sink/vtablet_sink.cpp | 6 +--
docs/en/docs/advanced/variables.md | 7 ++-
.../import/import-way/insert-into-manual.md | 4 +-
.../Manipulation/INSERT.md | 2 +-
.../SET-VARIABLE.md | 1 +
docs/zh-CN/docs/advanced/variables.md | 6 ++-
.../import/import-way/insert-into-manual.md | 4 +-
.../Manipulation/INSERT.md | 2 +-
.../SET-VARIABLE.md | 1 +
.../java/org/apache/doris/analysis/InsertStmt.java | 2 +-
.../java/org/apache/doris/qe/ConnectContext.java | 55 ++++++++++++++++------
.../java/org/apache/doris/qe/ConnectProcessor.java | 2 +
.../main/java/org/apache/doris/qe/Coordinator.java | 7 +--
.../java/org/apache/doris/qe/SessionVariable.java | 15 ++++++
.../java/org/apache/doris/qe/StmtExecutor.java | 9 +++-
.../org/apache/doris/qe/ConnectContextTest.java | 8 +++-
.../org/apache/doris/qe/SessionVariablesTest.java | 2 +
.../java/org/apache/doris/qe/VariableMgrTest.java | 3 ++
gensrc/thrift/PaloInternalService.thrift | 4 ++
.../suites/datev2/ssb_sf0.1_p1/load.groovy | 4 +-
.../suites/datev2/ssb_sf1_p2/load.groovy | 4 +-
regression-test/suites/ssb_sf0.1_p1/load.groovy | 4 +-
regression-test/suites/ssb_sf100_p2/load.groovy | 4 +-
regression-test/suites/ssb_sf1_p2/load.groovy | 4 +-
tools/ssb-tools/bin/load-ssb-data.sh | 7 +--
33 files changed, 129 insertions(+), 60 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c8db62c3b6..7ec1f5a21c 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1159,7 +1159,7 @@ bool IRuntimeFilter::await() {
DCHECK(is_consumer());
// bitmap filter is precise filter and only filter once, so it must be
applied.
int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
- ? _state->query_options().query_timeout
+ ? _state->execution_timeout()
: _state->runtime_filter_wait_time_ms();
if (_state->enable_pipeline_exec()) {
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
@@ -1209,7 +1209,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
// bitmap filter is precise filter and only filter once, so it must be
applied.
int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
- ? _state->query_options().query_timeout
+ ? _state->execution_timeout()
: _state->runtime_filter_wait_time_ms();
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
if (!_state->enable_pipeline_exec()) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 654c252d58..c18f3e9866 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -192,7 +192,7 @@ FragmentExecState::FragmentExecState(const TUniqueId&
query_id,
Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
if (params.__isset.query_options) {
- _timeout_second = params.query_options.query_timeout;
+ _timeout_second = params.query_options.execution_timeout;
}
if (_fragments_ctx == nullptr) {
@@ -648,7 +648,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
pipeline_engine_enabled);
- fragments_ctx->timeout_second = params.query_options.query_timeout;
+ fragments_ctx->timeout_second = params.query_options.execution_timeout;
_set_scan_concurrency(params, fragments_ctx.get());
bool has_query_mem_tracker =
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index 35d533e4a2..19fb2522b8 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -55,7 +55,7 @@ Status ResultBufferMgr::init() {
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int
buffer_size,
std::shared_ptr<BufferControlBlock>*
sender,
- bool enable_pipeline, int query_timeout)
{
+ bool enable_pipeline, int exec_timout) {
*sender = find_control_block(query_id);
if (*sender != nullptr) {
LOG(WARNING) << "already have buffer control block for this instance "
<< query_id;
@@ -78,7 +78,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId&
query_id, int buffer_size
// otherwise in some case may block all fragment handle threads
// details see issue https://github.com/apache/doris/issues/16203
// add extra 5s for avoid corner case
- int64_t max_timeout = time(nullptr) + query_timeout + 5;
+ int64_t max_timeout = time(nullptr) + exec_timout + 5;
cancel_at_time(max_timeout, query_id);
}
*sender = control_block;
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index 426af75047..250551f42a 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -48,7 +48,7 @@ public:
// sender is not used when call cancel or unregister
Status create_sender(const TUniqueId& query_id, int buffer_size,
std::shared_ptr<BufferControlBlock>* sender, bool
enable_pipeline,
- int query_timeout);
+ int exec_timeout);
void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 7a555e0336..eb185472c7 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -86,6 +86,8 @@ public:
}
int max_errors() const { return _query_options.max_errors; }
int query_timeout() const { return _query_options.query_timeout; }
+ int insert_timeout() const { return _query_options.insert_timeout; }
+ int execution_timeout() const { return _query_options.execution_timeout; }
int max_io_buffers() const { return _query_options.max_io_buffers; }
int num_scanner_threads() const { return
_query_options.num_scanner_threads; }
TQueryType::type query_type() const { return _query_options.query_type; }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index c7a9e62335..4260882c3c 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -59,7 +59,7 @@ Status Channel::init(RuntimeState* state) {
_brpc_request.set_sender_id(_parent->_sender_id);
_brpc_request.set_be_number(_be_number);
- _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) *
1000;
+ _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
_brpc_stub =
state->exec_env()->brpc_internal_client_cache()->get_client(
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index 5f7e874b09..d85318b297 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -102,7 +102,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), _buf_size, &_sender,
state->enable_pipeline_exec(),
- state->query_timeout()));
+ state->execution_timeout()));
// create writer
_writer.reset(new (std::nothrow) VFileResultWriter(
_file_opts.get(), _storage_type,
state->fragment_instance_id(), _output_vexpr_ctxs,
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index af71b76280..b5840b22fd 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -63,7 +63,7 @@ Status VResultSink::prepare(RuntimeState* state) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), _buf_size, &_sender,
state->enable_pipeline_exec(),
- state->query_timeout()));
+ state->execution_timeout()));
// create writer based on sink type
switch (_sink_type) {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 54f761955b..d21b4b4bf5 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -222,7 +222,7 @@ Status VNodeChannel::init(RuntimeState* state) {
return Status::InternalError("get rpc stub failed");
}
- _rpc_timeout_ms = state->query_options().query_timeout * 1000;
+ _rpc_timeout_ms = state->execution_timeout() * 1000;
_timeout_watch.start();
// Initialize _cur_add_block_request
@@ -821,8 +821,8 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
- _is_high_priority = (state->query_options().query_timeout <=
- config::load_task_high_priority_threshold_second);
+ _is_high_priority =
+ (state->execution_timeout() <=
config::load_task_high_priority_threshold_second);
// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
diff --git a/docs/en/docs/advanced/variables.md
b/docs/en/docs/advanced/variables.md
index b995bc3d44..cc434c6ff6 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -69,6 +69,7 @@ Variables that support both session-level and global-level
setting include:
* `sql_mode`
* `enable_profile`
* `query_timeout`
+* `insert_timeout`
* `exec_mem_limit`
* `batch_size`
* `parallel_fragment_exec_instance_num`
@@ -358,7 +359,11 @@ Translated with www.DeepL.com/Translator (free version)
* `query_timeout`
- Used to set the query timeout. This variable applies to all query
statements in the current connection, as well as INSERT statements. The default
is 5 minutes, in seconds.
+ Used to set the query timeout. This variable applies to all query
statements in the current connection. Particularly, timeout of INSERT
statements is recommended to be managed by the insert_timeout below. The
default is 5 minutes, in seconds.
+
+* `insert_timeout`
+ Used to set the insert timeout. This variable applies to INSERT statements
particularly in the current connection, and is recommended to manage
long-duration INSERT action. The default is 4 hours, in seconds. It will lose
effect when query_timeout is
+ greater than itself to make it compatible with the habits of older version
users to use query_timeout to control the timeout of INSERT statements.
* `resource_group`
diff --git a/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
b/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
index bd00cbb55c..5337322588 100644
--- a/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
@@ -189,7 +189,7 @@ This command returns the insert results and the details of
the corresponding tra
At present, Insert Into does not support custom import timeout time. All
Insert Into imports have a uniform timeout time. The default timeout time is 1
hour. If the imported source file cannot complete the import within the
specified time, the parameter `insert_load_default_timeout_second` of FE needs
to be adjusted.
- At the same time, the Insert Into statement receives the restriction of the
Session variable `query_timeout`. You can increase the timeout time by `SET
query_timeout = xxx;` in seconds.
+ At the same time, the Insert Into statement receives the restriction of the
Session variable `insert_timeout`. You can increase the timeout time by `SET
insert_timeout = xxx;` in seconds.
### Session Variables
@@ -199,7 +199,7 @@ This command returns the insert results and the details of
the corresponding tra
- query u timeout
- Insert Into itself is also an SQL command, so the Insert Into statement is
also restricted by the Session variable `query_timeout`. You can increase the
timeout time by `SET query_timeout = xxx;` in seconds.
+ Insert Into itself is also an SQL command, and the Insert Into statement is
restricted by the Session variable `insert_timeout`. You can increase the
timeout time by `SET insert_timeout = xxx;` in seconds.
## Best Practices
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
index eefd424c14..08cc77ab21 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
@@ -226,7 +226,7 @@ Since the previous import methods of Doris are all
asynchronous import methods,
2. Timeout time
- The timeout for INSERT operations is controlled by [session
variable](../../../../advanced/variables.md) `query_timeout`. The default is 5
minutes. If it times out, the job will be canceled.
+ The timeout for INSERT operations is controlled by [session
variable](../../../../advanced/variables.md) `insert_timeout`. The default is 4
hours. If it times out, the job will be canceled.
3. Label and atomicity
diff --git
a/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
b/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
index 4467c4aaff..41fa568ca8 100644
---
a/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
+++
b/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
@@ -58,6 +58,7 @@ Variables that support both the current session and the
global effect include:
- `sql_mode`
- `enable_profile`
- `query_timeout`
+- `insert_timeout`
- `exec_mem_limit`
- `batch_size`
- `allow_partition_column_nullable`
diff --git a/docs/zh-CN/docs/advanced/variables.md
b/docs/zh-CN/docs/advanced/variables.md
index dcbdb501e3..6bd438af60 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -71,6 +71,7 @@ SET GLOBAL exec_mem_limit = 137438953472
- `sql_mode`
- `enable_profile`
- `query_timeout`
+- `insert_timeout`
- `exec_mem_limit`
- `batch_size`
- `allow_partition_column_nullable`
@@ -355,7 +356,10 @@ SELECT /*+ SET_VAR(query_timeout = 1,
enable_partition_cache=true) */ sleep(3);
- `query_timeout`
- 用于设置查询超时。该变量会作用于当前连接中所有的查询语句,以及 INSERT 语句。默认为 5 分钟,单位为秒。
+ 用于设置查询超时。该变量会作用于当前连接中所有的查询语句,对于 INSERT 语句推荐使用insert_timeout。默认为 5 分钟,单位为秒。
+
+- `insert_timeout`
+ 用于设置针对 INSERT 语句的超时。该变量仅作用于 INSERT 语句,建议在 INSERT 行为易持续较长时间的场景下设置。默认为 4
小时,单位为秒。由于旧版本用户会通过延长 query_timeout 来防止 INSERT 语句超时,insert_timeout 在
query_timeout 大于自身的情况下将会失效, 以兼容旧版本用户的习惯。
- `resource_group`
diff --git
a/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
b/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
index 198e79e02e..ff2f75e54c 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
@@ -192,7 +192,7 @@ TransactionStatus: VISIBLE
目前 Insert Into 并不支持自定义导入的 timeout 时间,所有 Insert Into 导入的超时时间是统一的,默认的 timeout
时间为1小时。如果导入的源文件无法在规定时间内完成导入,则需要调整 FE
的参数```insert_load_default_timeout_second```。
- 同时 Insert Into 语句受到 Session 变量 `query_timeout` 的限制。可以通过 `SET query_timeout =
xxx;` 来增加超时时间,单位是秒。
+ 同时 Insert Into 语句受到 Session 变量 `insert_timeout` 的限制。可以通过 `SET insert_timeout
= xxx;` 来增加超时时间,单位是秒。
### Session 变量
@@ -208,7 +208,7 @@ TransactionStatus: VISIBLE
+ query\_timeout
- Insert Into 本身也是一个 SQL 命令,因此 Insert Into 语句也受到 Session 变量 `query_timeout`
的限制。可以通过 `SET query_timeout = xxx;` 来增加超时时间,单位是秒。
+ Insert Into 本身也是一个 SQL 命令,Insert Into 语句受到 Session 变量 `insert_timeout`
的限制。可以通过 `SET insert_timeout = xxx;` 来增加超时时间,单位是秒。
## 最佳实践
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
index d19592319c..33018fcd0d 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
@@ -226,7 +226,7 @@ INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from
test2;
2. 超时时间
- INSERT 操作的超时时间由 [会话变量](../../../../advanced/variables.md) `query_timeout`
控制。默认为5分钟。超时则作业会被取消。
+ INSERT 操作的超时时间由 [会话变量](../../../../advanced/variables.md) `insert_timeout`
控制。默认为4小时。超时则作业会被取消。
3. Label 和原子性
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
index 796ecf6660..6e77762319 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
@@ -58,6 +58,7 @@ SET variable_assignment [, variable_assignment] ...
- `sql_mode`
- `enable_profile`
- `query_timeout`
+- `insert_timeout`
- `exec_mem_limit`
- `batch_size`
- `allow_partition_column_nullable`
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index a9269db687..bf587ab97d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -304,7 +304,7 @@ public class InsertStmt extends DdlStmt {
db =
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
// create label and begin transaction
- long timeoutSecond =
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+ long timeoutSecond = ConnectContext.get().getExecTimeout();
if (Strings.isNullOrEmpty(label)) {
label = "insert_" +
DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 9f4be5fdde..1b79fa6771 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -152,6 +152,15 @@ public class ConnectContext {
private long userQueryTimeout;
+ /**
+ * the global execution timeout in seconds, currently set according to
query_timeout and insert_timeout.
+ * <p>
+ * when a connection is established, exec_timeout is set by query_timeout,
when the statement is an insert stmt,
+ * then it is set to max(query_timeout, insert_timeout) with {@link
#resetExecTimeout()} in
+ * {@link ConnectProcessor#handleQuery()} after the StmtExecutor is
specified.
+ */
+ private int executionTimeoutS;
+
public void setUserQueryTimeout(long queryTimeout) {
this.userQueryTimeout = queryTimeout;
}
@@ -164,7 +173,7 @@ public class ConnectContext {
}
public void setOrUpdateInsertResult(long txnId, String label, String db,
String tbl,
- TransactionStatus txnStatus, long
loadedRows, int filteredRows) {
+ TransactionStatus txnStatus, long loadedRows, int filteredRows) {
if (isTxnModel() && insertResult != null) {
insertResult.updateResult(txnStatus, loadedRows, filteredRows);
} else {
@@ -220,6 +229,8 @@ public class ConnectContext {
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
}
+ // initialize executionTimeoutS to default to queryTimeout
+ executionTimeoutS = sessionVariable.getQueryTimeoutS();
}
public boolean isTxnModel() {
@@ -568,7 +579,7 @@ public class ConnectContext {
boolean killFlag = false;
boolean killConnection = false;
if (command == MysqlCommand.COM_SLEEP) {
- if (delta > sessionVariable.getWaitTimeoutS() * 1000) {
+ if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
// Need kill this connection.
LOG.warn("kill wait timeout connection, remote: {}, wait
timeout: {}",
getMysqlChannel().getRemoteHostPortString(),
sessionVariable.getWaitTimeoutS());
@@ -577,25 +588,27 @@ public class ConnectContext {
killConnection = true;
}
} else {
+ long timeout;
+ String timeoutTag = "query";
if (userQueryTimeout > 0) {
// user set query_timeout property
- if (delta > userQueryTimeout * 1000) {
- LOG.warn("kill query timeout, remote: {}, query timeout:
{}",
- getMysqlChannel().getRemoteHostPortString(),
userQueryTimeout);
-
- killFlag = true;
- }
+ timeout = userQueryTimeout * 1000L;
} else {
- // default use session query_timeout
- if (delta > sessionVariable.getQueryTimeoutS() * 1000) {
- LOG.warn("kill query timeout, remote: {}, query timeout:
{}",
- getMysqlChannel().getRemoteHostPortString(),
sessionVariable.getQueryTimeoutS());
+ //to ms
+ timeout = executionTimeoutS * 1000L;
+ }
+ //deal with insert stmt particularly
+ if (executor != null && executor.isInsertStmt()) {
+ timeoutTag = "insert";
+ }
- // Only kill
- killFlag = true;
- }
+ if (delta > timeout) {
+ LOG.warn("kill {} timeout, remote: {}, query timeout: {}",
+ timeoutTag,
getMysqlChannel().getRemoteHostPortString(), timeout);
+ killFlag = true;
}
}
+
if (killFlag) {
kill(killConnection);
}
@@ -635,6 +648,18 @@ public class ConnectContext {
return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp();
}
+ public void resetExecTimeout() {
+ if (executor != null && executor.isInsertStmt()) {
+ // particular timeout for insert stmt, we can make other
particular timeout in the same way.
+ // set the execution timeout as max(insert_timeout,query_timeout)
to be compatible with older versions
+ executionTimeoutS = Math.max(sessionVariable.getInsertTimeoutS(),
executionTimeoutS);
+ }
+ }
+
+ public int getExecTimeout() {
+ return executionTimeoutS;
+ }
+
public class ThreadInfo {
public boolean isFull;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 00f047b318..2493ed2010 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -408,6 +408,8 @@ public class ConnectProcessor {
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
+ // reset the executionTimeout corresponding with the StmtExecutor
+ ctx.resetExecTimeout();
try {
executor.execute();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 77775c713c..4d29d94333 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -235,7 +235,7 @@ public class Coordinator {
private final TUniqueId nextInstanceId;
// a timestamp represent the absolute timeout
- // eg, System.currentTimeMillis() + query_timeout * 1000
+ // eg, System.currentTimeMillis() + executeTimeoutS * 1000
private long timeoutDeadline;
private boolean enableShareHashTableForBroadcastJoin = false;
@@ -389,6 +389,7 @@ public class Coordinator {
this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized());
this.queryOptions.setEnablePipelineEngine(VectorizedUtil.isPipeline());
this.queryOptions.setBeExecVersion(Config.be_exec_version);
+ this.queryOptions.setExecutionTimeout(context.getExecTimeout());
}
public long getJobId() {
@@ -585,7 +586,7 @@ public class Coordinator {
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
DataSink topDataSink = topParams.fragment.getSink();
- this.timeoutDeadline = System.currentTimeMillis() +
queryOptions.query_timeout * 1000L;
+ this.timeoutDeadline = System.currentTimeMillis() +
queryOptions.getExecutionTimeout() * 1000L;
if (topDataSink instanceof ResultSink || topDataSink instanceof
ResultFileSink) {
TNetworkAddress execBeAddr =
topParams.instanceExecParams.get(0).host;
receiver = new
ResultReceiver(topParams.instanceExecParams.get(0).instanceId,
@@ -785,7 +786,7 @@ public class Coordinator {
String operation) throws RpcException, UserException {
if (leftTimeMs <= 0) {
throw new UserException("timeout before waiting for " + operation
+ " RPC. Elapse(sec): " + (
- (System.currentTimeMillis() - timeoutDeadline) / 1000 +
queryOptions.query_timeout));
+ (System.currentTimeMillis() - timeoutDeadline) / 1000 +
queryOptions.getExecutionTimeout()));
}
long timeoutMs = Math.min(leftTimeMs,
Config.remote_fragment_exec_timeout_ms);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 28c2028226..5be52b1097 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -57,6 +57,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String QUERY_TIMEOUT = "query_timeout";
+ public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
public static final String SQL_MODE = "sql_mode";
public static final String RESOURCE_VARIABLE = "resource_group";
@@ -293,6 +294,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = QUERY_TIMEOUT)
public int queryTimeoutS = 300;
+ @VariableMgr.VarAttr(name = INSERT_TIMEOUT)
+ public int insertTimeoutS = 14400;
+
// if true, need report to coordinator when plan fragment execute
successfully.
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;
@@ -820,6 +824,14 @@ public class SessionVariable implements Serializable,
Writable {
return queryTimeoutS;
}
+ public int getInsertTimeoutS() {
+ return insertTimeoutS;
+ }
+
+ public void setInsertTimeoutS(int insertTimeoutS) {
+ this.insertTimeoutS = insertTimeoutS;
+ }
+
public boolean enableProfile() {
return enableProfile;
}
@@ -1674,6 +1686,9 @@ public class SessionVariable implements Serializable,
Writable {
if (queryOptions.isSetQueryTimeout()) {
setQueryTimeoutS(queryOptions.getQueryTimeout());
}
+ if (queryOptions.isSetInsertTimeout()) {
+ setInsertTimeoutS(queryOptions.getInsertTimeout());
+ }
}
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 86bf8945a6..bef4253baf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -393,6 +393,10 @@ public class StmtExecutor implements ProfileWriter {
return parsedStmt != null && parsedStmt instanceof QueryStmt;
}
+ public boolean isInsertStmt() {
+ return parsedStmt != null && parsedStmt instanceof InsertStmt;
+ }
+
/**
* Used for audit in ConnectProcessor.
* <p>
@@ -1562,8 +1566,9 @@ public class StmtExecutor implements ProfileWriter {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
coord);
coord.exec();
-
- boolean notTimeout =
coord.join(context.getSessionVariable().getQueryTimeoutS());
+ int execTimeout = context.getExecTimeout();
+ LOG.debug("Insert execution timeout:{}", execTimeout);
+ boolean notTimeout = coord.join(execTimeout);
if (!coord.isDone()) {
coord.cancel();
if (notTimeout) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index f8c94694d4..fdee7a7a37 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -196,12 +196,16 @@ public class ConnectContextTest {
// sleep no time out
Assert.assertFalse(ctx.isKilled());
- long now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 - 1;
+ ctx.setExecutor(executor);
+ ctx.resetExecTimeout();
+ long now = ctx.getExecTimeout() * 1000L - 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());
// Timeout
- now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + 1;
+ ctx.setExecutor(executor);
+ ctx.resetExecTimeout();
+ now = ctx.getExecTimeout() * 1000L + 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 947e83d95d..5193dd9a82 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -79,8 +79,10 @@ public class SessionVariablesTest extends TestWithFeService {
Assertions.assertTrue(queryOptions.isSetQueryTimeout());
queryOptions.setQueryTimeout(123);
+ queryOptions.setInsertTimeout(123);
sessionVariable.setForwardedSessionVariables(queryOptions);
Assertions.assertEquals(123, sessionVariable.getQueryTimeoutS());
+ Assertions.assertEquals(123, sessionVariable.getInsertTimeoutS());
}
@Test
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 1067a0c1a3..71740f061f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -64,6 +64,7 @@ public class VariableMgrTest {
long originExecMemLimit = var.getMaxExecMemByte();
boolean originEnableProfile = var.enableProfile();
long originQueryTimeOut = var.getQueryTimeoutS();
+ final int originInsertTimeout = var.getInsertTimeoutS();
List<List<String>> rows = VariableMgr.dump(SetType.SESSION, var, null);
Assert.assertTrue(rows.size() > 5);
@@ -76,6 +77,8 @@ public class VariableMgrTest {
Assert.assertEquals(String.valueOf(originQueryTimeOut),
row.get(1));
} else if (row.get(0).equalsIgnoreCase("sql_mode")) {
Assert.assertEquals("", row.get(1));
+ } else if (row.get(0).equalsIgnoreCase("insert_timeout")) {
+ Assert.assertEquals(String.valueOf(originInsertTimeout),
row.get(1));
}
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 65471d8f5c..f3befbb80a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -194,6 +194,10 @@ struct TQueryOptions {
59: optional i32 partitioned_hash_agg_rows_threshold = 0
60: optional bool enable_file_cache = true
+
+ 61: optional i32 insert_timeout = 14400
+
+ 62: optional i32 execution_timeout = 3600
}
diff --git a/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
b/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
index b851c13c4d..6ed3a68b91 100644
--- a/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
+++ b/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
@@ -88,8 +88,8 @@ suite("load") {
def rowCount = sql "select count(*) from ${table}"
if (rowCount[0][0] != table_rows) {
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
- sql "set global query_timeout=3600"
- def r = sql "select @@query_timeout"
+ sql "set global insert_timeout=3600"
+ def r = sql "select @@insert_timeout"
assertEquals(3600, r[0][0])
year_cons = [
'lo_orderdate<19930101',
diff --git a/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
b/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
index fe0cead6f4..5006e1a384 100644
--- a/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
+++ b/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
@@ -88,8 +88,8 @@ suite("load") {
def rowCount = sql "select count(*) from ${table}"
if (rowCount[0][0] != table_rows) {
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
- sql "set global query_timeout=3600"
- def r = sql "select @@query_timeout"
+ sql "set global insert_timeout=3600"
+ def r = sql "select @@insert_timeout"
assertEquals(3600, r[0][0])
year_cons = [
'lo_orderdate<19930101',
diff --git a/regression-test/suites/ssb_sf0.1_p1/load.groovy
b/regression-test/suites/ssb_sf0.1_p1/load.groovy
index 6baf4d2bb0..0c81fb4a6f 100644
--- a/regression-test/suites/ssb_sf0.1_p1/load.groovy
+++ b/regression-test/suites/ssb_sf0.1_p1/load.groovy
@@ -87,8 +87,8 @@ suite("load") {
def rowCount = sql "select count(*) from ${table}"
if (rowCount[0][0] != table_rows) {
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
- sql "set global query_timeout=3600"
- def r = sql "select @@query_timeout"
+ sql "set global insert_timeout=3600"
+ def r = sql "select @@insert_timeout"
assertEquals(3600, r[0][0])
year_cons = [
'lo_orderdate<19930101',
diff --git a/regression-test/suites/ssb_sf100_p2/load.groovy
b/regression-test/suites/ssb_sf100_p2/load.groovy
index 492e121027..bcf423f7d4 100644
--- a/regression-test/suites/ssb_sf100_p2/load.groovy
+++ b/regression-test/suites/ssb_sf100_p2/load.groovy
@@ -77,9 +77,9 @@ suite('load') {
def rowCount = sql "select count(*) from ${table}"
if (rowCount[0][0] != table_rows) {
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
- sql "set global query_timeout=3600"
+ sql "set global insert_timeout=3600"
sql "sync"
- def r = sql "select @@query_timeout"
+ def r = sql "select @@insert_timeout"
assertEquals(3600, r[0][0])
year_cons = [
'lo_orderdate<19930101',
diff --git a/regression-test/suites/ssb_sf1_p2/load.groovy
b/regression-test/suites/ssb_sf1_p2/load.groovy
index fdaaad5c3b..ee0163b641 100644
--- a/regression-test/suites/ssb_sf1_p2/load.groovy
+++ b/regression-test/suites/ssb_sf1_p2/load.groovy
@@ -89,8 +89,8 @@ suite("load") {
def rowCount = sql "select count(*) from ${table}"
if (rowCount[0][0] != table_rows) {
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
- sql "set global query_timeout=3600"
- def r = sql "select @@query_timeout"
+ sql "set global insert_timeout=3600"
+ def r = sql "select @@insert_timeout"
assertEquals(3600, r[0][0])
year_cons = [
'lo_orderdate<19930101',
diff --git a/tools/ssb-tools/bin/load-ssb-data.sh
b/tools/ssb-tools/bin/load-ssb-data.sh
index be5c550f93..db958b3921 100755
--- a/tools/ssb-tools/bin/load-ssb-data.sh
+++ b/tools/ssb-tools/bin/load-ssb-data.sh
@@ -257,16 +257,12 @@ date
echo "==========Start to insert data into ssb flat table=========="
echo "change some session variables before load, and then restore after load."
-origin_query_timeout=$(
- set -e
- run_sql 'select @@query_timeout;' | sed -n '3p'
-)
origin_parallel=$(
set -e
run_sql 'select @@parallel_fragment_exec_instance_num;' | sed -n '3p'
)
# set parallel_fragment_exec_instance_num=1, loading maybe slow but stable.
-run_sql "set global query_timeout=7200;"
+run_sql "set global insert_timeout=7200;"
run_sql "set global parallel_fragment_exec_instance_num=1;"
echo '============================================'
date
@@ -274,7 +270,6 @@ load_lineitem_flat
date
echo '============================================'
echo "restore session variables"
-run_sql "set global query_timeout=${origin_query_timeout};"
run_sql "set global parallel_fragment_exec_instance_num=${origin_parallel};"
echo '============================================'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]