This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 49dec9f39d1 [branch-2.1] Picks "[opt](merge-on-write) Reduce the
version not continuous logs for merge-on-write table #40946" (#40996)
49dec9f39d1 is described below
commit 49dec9f39d1af8e4efb30f08d0c2d871c8d86f72
Author: bobhan1 <[email protected]>
AuthorDate: Thu Sep 19 23:58:05 2024 +0800
[branch-2.1] Picks "[opt](merge-on-write) Reduce the version not continuous
logs for merge-on-write table #40946" (#40996)
picks https://github.com/apache/doris/pull/40946
---
be/src/common/config.cpp | 4 ++++
be/src/common/config.h | 4 ++++
be/src/olap/task/engine_publish_version_task.cpp | 26 +++++++++++++---------
.../main/java/org/apache/doris/common/Config.java | 4 ++++
.../java/org/apache/doris/master/MasterImpl.java | 14 +++++++++---
5 files changed, 39 insertions(+), 13 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 473842a1886..df720853c1f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1134,6 +1134,10 @@ DEFINE_mBool(enable_missing_rows_correctness_check,
"false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
+// When the version is not continuous for MOW table in publish phase and the
gap between
+// current txn's publishing version and the max version of the tablet exceeds
this value,
+// don't print warning log
+DEFINE_mInt32(publish_version_gap_logging_threshold, "200");
// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a3d4b35dce8..4b158f1e45e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1202,6 +1202,10 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
+// When the version is not continuous for MOW table in publish phase and the
gap between
+// current txn's publishing version and the max version of the tablet exceeds
this value,
+// don't print warning log
+DECLARE_mInt32(publish_version_gap_logging_threshold);
// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 9601cad88d1..66721f56237 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -231,16 +231,22 @@ Status EnginePublishVersionTask::execute() {
int64_t missed_txn_id =
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
- auto msg = fmt::format(
- "uniq key with merge-on-write version not
continuous, "
- "missed version={}, it's transaction_id={},
current publish "
- "version={}, tablet_id={}, transaction_id={}",
- missed_version, missed_txn_id, version.second,
tablet->tablet_id(),
- _publish_version_req.transaction_id);
- if (first_time_update) {
- LOG(INFO) << msg;
- } else {
- LOG_EVERY_SECOND(INFO) << msg;
+ bool need_log =
+ (config::publish_version_gap_logging_threshold
< 0 ||
+ max_version +
config::publish_version_gap_logging_threshold >=
+ version.second);
+ if (need_log) {
+ auto msg = fmt::format(
+ "uniq key with merge-on-write version not
continuous, "
+ "missed version={}, it's
transaction_id={}, current publish "
+ "version={}, tablet_id={},
transaction_id={}",
+ missed_version, missed_txn_id,
version.second,
+ tablet->tablet_id(),
_publish_version_req.transaction_id);
+ if (first_time_update) {
+ LOG(INFO) << msg;
+ } else {
+ LOG_EVERY_SECOND(INFO) << msg;
+ }
}
};
// The versions during the schema change period need to be
also continuous
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index b9eefd839ea..3bf55f3da40 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -494,6 +494,10 @@ public class Config extends ConfigBase {
"print log interval for publish transaction failed interval"})
public static long publish_fail_log_interval_second = 5 * 60;
+ @ConfField(mutable = true, masterOnly = true, description = {"一个
PUBLISH_VERSION 任务打印失败日志的次数上限",
+ "the upper limit of failure logs of PUBLISH_VERSION task"})
+ public static long publish_version_task_failed_log_threshold = 80;
+
@ConfField(mutable = true, masterOnly = true, description =
{"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction
to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 92479534241..4870b3a5820 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.SparkLoadJob;
@@ -86,11 +87,13 @@ public class MasterImpl {
// check task status
// retry task by report process
TStatus taskStatus = request.getTaskStatus();
+ TTaskType taskType = request.getTaskType();
+ long signature = request.getSignature();
if (LOG.isDebugEnabled()) {
LOG.debug("get task report: {}", request);
}
- if (taskStatus.getStatusCode() != TStatusCode.OK) {
+ if (taskStatus.getStatusCode() != TStatusCode.OK && taskType !=
TTaskType.PUBLISH_VERSION) {
LOG.warn("finish task reports bad. request: {}", request);
}
@@ -109,8 +112,6 @@ public class MasterImpl {
}
long backendId = backend.getId();
- TTaskType taskType = request.getTaskType();
- long signature = request.getSignature();
AgentTask task = AgentTaskQueue.getTask(backendId, taskType,
signature);
if (task == null) {
@@ -128,6 +129,13 @@ public class MasterImpl {
} else {
if (taskStatus.getStatusCode() != TStatusCode.OK) {
task.failed();
+ if (taskType == TTaskType.PUBLISH_VERSION) {
+ boolean needLog =
(Config.publish_version_task_failed_log_threshold < 0
+ || task.getFailedTimes() <=
Config.publish_version_task_failed_log_threshold);
+ if (needLog) {
+ LOG.warn("finish task reports bad. request: {}",
request);
+ }
+ }
String errMsg = "task type: " + taskType + ", status_code: " +
taskStatus.getStatusCode().toString()
+ (taskStatus.isSetErrorMsgs() ? (", status_message: "
+ taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " +
signature;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]