This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 38114ce1e86 [fix](merge-cloud) fix single rowset did not trigger
compaction in cloud mode (#34622)
38114ce1e86 is described below
commit 38114ce1e860799a3100a6395211e8dbcc27fb63
Author: Luwei <[email protected]>
AuthorDate: Fri May 10 10:42:34 2024 +0800
[fix](merge-cloud) fix single rowset did not trigger compaction in cloud
mode (#34622)
---
.../cloud/cloud_cumulative_compaction_policy.cpp | 11 ++--
.../suites/compaction/test_base_compaction.groovy | 74 ++++++++++++---------
.../test_base_compaction_no_value.groovy | 76 ++++++++++++----------
3 files changed, 92 insertions(+), 69 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index d040b21d421..1c658e5d546 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -54,7 +54,7 @@ int32_t
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
size_t* compaction_score, bool allow_delete) {
- //size_t promotion_size = tablet->cumulative_promotion_size();
+ size_t promotion_size = cloud_promotion_size(tablet);
auto max_version = tablet->max_version().first;
int transient_size = 0;
*compaction_score = 0;
@@ -93,6 +93,10 @@ int32_t
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
}
+ if (total_size >= promotion_size) {
+ return transient_size;
+ }
+
// if there is delete version, do compaction directly
if (last_delete_version->first != -1) {
if (input_rowsets->size() == 1) {
@@ -154,9 +158,8 @@ int32_t
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
*compaction_score = new_compaction_score;
VLOG_CRITICAL << "cumulative compaction size_based policy,
compaction_score = "
- << *compaction_score << ", total_size = "
- << total_size
- //<< ", calc promotion size value = " << promotion_size
+ << *compaction_score << ", total_size = " << total_size
+ << ", calc promotion size value = " << promotion_size
<< ", tablet = " << tablet->tablet_id() << ", input_rowset
size "
<< input_rowsets->size();
diff --git a/regression-test/suites/compaction/test_base_compaction.groovy
b/regression-test/suites/compaction/test_base_compaction.groovy
index d0c8074ca4c..5aae1ecdef7 100644
--- a/regression-test/suites/compaction/test_base_compaction.groovy
+++ b/regression-test/suites/compaction/test_base_compaction.groovy
@@ -33,14 +33,6 @@ suite("test_base_compaction") {
def configList = parseJson(out.trim())
assert configList instanceof List
- boolean disableAutoCompaction = true
- for (Object ele in (List) configList) {
- assert ele instanceof List<String>
- if (((List<String>) ele)[0] == "disable_auto_compaction") {
- disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
- }
- }
-
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
@@ -64,7 +56,8 @@ suite("test_base_compaction") {
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
PROPERTIES (
- "replication_num" = "1"
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
)
"""
@@ -84,7 +77,42 @@ suite("test_base_compaction") {
// relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
- file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+
+ time 10000 // limit inflight 10s
+
+ // stream load action will check result, include Success status, and
NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ // a default db 'regression_test' is specified in
+ // ${DORIS_HOME}/conf/regression-conf.groovy
+ table tableName
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually is
'\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+ // also, you can stream load a http stream, e.g. http://xxx/some.csv
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
time 10000 // limit inflight 10s
@@ -114,13 +142,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
- if (compactJson.status.toLowerCase() == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction) {
- assertEquals("success", compactJson.status.toLowerCase())
- }
+ assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
@@ -154,7 +176,7 @@ suite("test_base_compaction") {
// relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
- file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
time 10000 // limit inflight 10s
@@ -182,13 +204,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
- if (compactJson.status.toLowerCase() == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction) {
- assertEquals("success", compactJson.status.toLowerCase())
- }
+ assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
@@ -219,13 +235,7 @@ suite("test_base_compaction") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
- if (compactJson.status.toLowerCase() == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction) {
- assertEquals("success", compactJson.status.toLowerCase())
- }
+ assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
diff --git
a/regression-test/suites/compaction/test_base_compaction_no_value.groovy
b/regression-test/suites/compaction/test_base_compaction_no_value.groovy
index 81ce0cd8263..91a50ce4dcd 100644
--- a/regression-test/suites/compaction/test_base_compaction_no_value.groovy
+++ b/regression-test/suites/compaction/test_base_compaction_no_value.groovy
@@ -27,20 +27,12 @@ suite("test_base_compaction_no_value") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
-
+
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
- boolean disableAutoCompaction = true
- for (Object ele in (List) configList) {
- assert ele instanceof List<String>
- if (((List<String>) ele)[0] == "disable_auto_compaction") {
- disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
- }
- }
-
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
@@ -64,7 +56,8 @@ suite("test_base_compaction_no_value") {
UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY,
L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,
L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
PROPERTIES (
- "replication_num" = "1"
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
)
"""
@@ -84,7 +77,42 @@ suite("test_base_compaction_no_value") {
// relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
- file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+
+ time 10000 // limit inflight 10s
+
+ // stream load action will check result, include Success status, and
NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ // a default db 'regression_test' is specified in
+ // ${DORIS_HOME}/conf/regression-conf.groovy
+ table tableName
+
+ // default label is UUID:
+ // set 'label' UUID.randomUUID().toString()
+
+ // default column_separator is specify in doris fe config, usually is
'\t'.
+ // this line change to ','
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+
+ // relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+ // also, you can stream load a http stream, e.g. http://xxx/some.csv
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
time 10000 // limit inflight 10s
@@ -114,13 +142,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
- if (compactJson.status.toLowerCase() == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction) {
- assertEquals("success", compactJson.status.toLowerCase())
- }
+ assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
@@ -154,7 +176,7 @@ suite("test_base_compaction_no_value") {
// relate to
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
- file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+ file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
time 10000 // limit inflight 10s
@@ -182,13 +204,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
- if (compactJson.status.toLowerCase() == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction) {
- assertEquals("success", compactJson.status.toLowerCase())
- }
+ assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
@@ -219,13 +235,7 @@ suite("test_base_compaction_no_value") {
logger.info("Run compaction: code=" + code + ", out=" + out + ", err="
+ err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
- if (compactJson.status.toLowerCase() == "fail") {
- assertEquals(disableAutoCompaction, false)
- logger.info("Compaction was done automatically!")
- }
- if (disableAutoCompaction) {
- assertEquals("success", compactJson.status.toLowerCase())
- }
+ assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]