This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d52a426002e [fix](pipeline-load) fix no error url when data quality
error and total rows is negative (#34072)
d52a426002e is described below
commit d52a426002ec912516a592304146e5da2ac5997d
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sat Apr 27 16:21:16 2024 +0800
[fix](pipeline-load) fix no error url when data quality error and total
rows is negative (#34072)
---
.../runtime/stream_load/stream_load_executor.cpp | 47 +++++--------
be/src/vec/sink/writer/vtablet_writer.cpp | 12 ++--
.../data/load_p0/stream_load/test_error_url.csv | 9 +++
.../load_p0/stream_load/test_pipeline_load.groovy | 2 +-
.../stream_load/test_stream_load_error_url.groovy | 76 ++++++++++++++++++++++
.../test_partial_update_schema_change.groovy | 4 +-
...t_partial_update_schema_change_row_store.groovy | 4 +-
7 files changed, 113 insertions(+), 41 deletions(-)
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 720c2e86898..58621c77a2a 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -78,38 +78,25 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
- if (status->ok()) {
- ctx->number_total_rows = state->num_rows_load_total();
- ctx->number_loaded_rows = state->num_rows_load_success();
- ctx->number_filtered_rows = state->num_rows_load_filtered();
- ctx->number_unselected_rows = state->num_rows_load_unselected();
-
- int64_t num_selected_rows = ctx->number_total_rows -
ctx->number_unselected_rows;
- if (!ctx->group_commit && num_selected_rows > 0 &&
- (double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
- // NOTE: Do not modify the error message here, for historical
reasons,
- // some users may rely on this error message.
- *status = Status::DataQualityError("too many filtered rows");
- }
- if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
- ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
- }
+ ctx->number_total_rows = state->num_rows_load_total();
+ ctx->number_loaded_rows = state->num_rows_load_success();
+ ctx->number_filtered_rows = state->num_rows_load_filtered();
+ ctx->number_unselected_rows = state->num_rows_load_unselected();
+ int64_t num_selected_rows = ctx->number_total_rows -
ctx->number_unselected_rows;
+ if (!ctx->group_commit && num_selected_rows > 0 &&
+ (double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
+ // NOTE: Do not modify the error message here, for historical
reasons,
+ // some users may rely on this error message.
+ *status = Status::DataQualityError("too many filtered rows");
+ }
+ if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
+ ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
+ }
- if (status->ok()) {
-
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
- DorisMetrics::instance()->stream_load_rows_total->increment(
- ctx->number_loaded_rows);
- }
+ if (status->ok()) {
+
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
+
DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows);
} else {
- if (ctx->group_commit) {
- ctx->number_total_rows = state->num_rows_load_total();
- ctx->number_loaded_rows = state->num_rows_load_success();
- ctx->number_filtered_rows = state->num_rows_load_filtered();
- ctx->number_unselected_rows =
state->num_rows_load_unselected();
- if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
- ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
- }
- }
LOG(WARNING) << "fragment execute failed"
<< ", err_msg=" << status->to_string() << ", " <<
ctx->brief();
// cancel body_sink, make sender known it
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 6f0c4a3e2a8..fdd683f42ce 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1676,6 +1676,12 @@ Status VTabletWriter::write(doris::vectorized::Block&
input_block) {
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
_number_input_rows += rows;
+ // update incrementally so that FE can get the progress.
+ // the real 'num_rows_load_total' will be set when sink being closed.
+ _state->update_num_rows_load_total(rows);
+ _state->update_num_bytes_load_total(bytes);
+ DorisMetrics::instance()->load_rows->increment(rows);
+ DorisMetrics::instance()->load_bytes->increment(bytes);
_row_distribution_watch.start();
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
@@ -1688,12 +1694,6 @@ Status VTabletWriter::write(doris::vectorized::Block&
input_block) {
_generate_index_channels_payloads(_row_part_tablet_ids,
channel_to_payload);
_row_distribution_watch.stop();
- // update incrementally so that FE can get the progress.
- // the real 'num_rows_load_total' will be set when sink being closed.
- _state->update_num_rows_load_total(rows);
- _state->update_num_bytes_load_total(bytes);
- DorisMetrics::instance()->load_rows->increment(rows);
- DorisMetrics::instance()->load_bytes->increment(bytes);
// Random distribution and the block belongs to a single tablet, we could
optimize to append the whole
// block into node channel.
bool load_block_to_single_tablet =
diff --git a/regression-test/data/load_p0/stream_load/test_error_url.csv
b/regression-test/data/load_p0/stream_load/test_error_url.csv
new file mode 100644
index 00000000000..a1c8c042f11
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/test_error_url.csv
@@ -0,0 +1,9 @@
+281747159,apple-store,2009-08-07,price_change,2.99,1.99,"","",2024-02-02T14:08:16+08:00
+281747159,apple-store,2009-08-13,version_change,1.1.2 (iPhone OS 3.0
Tested),1.1.1 (iPhone OS 3.0 Tested),"","",2024-02-02T14:08:16+08:00
+281790044,apple-store,2009-08-06,version_change,3.0 (iPhone OS 3.0
Tested),1.3.1 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00
+281790044,apple-store,2009-08-17,version_change,3.0.1 (iPhone OS 3.0
Tested),3.0 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00
+281796108,apple-store,2009-08-24,version_change,3.1.0 (iPhone OS 3.0
Tested),3.0.2 (iPhone OS 3.0 Tested),"","",2024-02-10T17:48:26+08:00
+281941097,apple-store,2009-08-15,version_change,2.6.0 (iPhone OS 3.0
Tested),2.5.0,"","",2024-02-17T11:15:40+08:00
+281941097,apple-store,2009-08-22,version_change,2.6.3 (iPhone OS 3.0
Tested),2.6.0 (iPhone OS 3.0 Tested),"","",2024-02-17T11:15:40+08:00
+282614216,apple-store,2009-08-12,version_change,1.4.0 (iPhone OS 3.0
Tested),1.3.0 (iPhone OS 3.0 Tested),"","",2024-02-21T21:57:58+08:00
+282738621,apple-store,2009-08-12,name_change,ZHI Chinese-English
Dictionary,Chinese English Dictionary,"","",2024-02-17T14:20:44+08:00
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
index b8978a15ca9..472176a519d 100644
--- a/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_pipeline_load.groovy
@@ -152,7 +152,7 @@ suite("test_pipeline_load", "nonConcurrent") {
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("Encountered unqualified
data"))
- assertEquals(0, json.NumberTotalRows)
+ assertEquals(100, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
new file mode 100644
index 00000000000..72fc212e241
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_stream_load_error_url.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_error_url", "p0") {
+ def tableName = "test_stream_load_error_url"
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `product_id` BIGINT NULL,
+ `market_code` VARCHAR(32),
+ `date` DATE NULL,
+ `event_type` VARCHAR(255) NULL,
+ `new_value` TEXT NULL,
+ `old_value` TEXT NULL,
+ `release_note` TEXT NULL,
+ `release_date` TEXT NULL,
+ `u_time` DATETIME NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`product_id`, `market_code`, `date`, `event_type`)
+ COMMENT 'test_error_url'
+ PARTITION BY RANGE(`date`)
+ (PARTITION p_201001 VALUES [('2010-01-01'), ('2012-01-01')),
+ PARTITION p_201201 VALUES [('2012-01-01'), ('2014-01-01')),
+ PARTITION p_201401 VALUES [('2014-01-01'), ('2016-01-01')),
+ PARTITION p_201601 VALUES [('2016-01-01'), ('2018-01-01')),
+ PARTITION p_201801 VALUES [('2018-01-01'), ('2020-01-01')),
+ PARTITION p_202001 VALUES [('2020-01-01'), ('2022-01-01')),
+ PARTITION p_202201 VALUES [('2022-01-01'), ('2024-01-01')),
+ PARTITION p_202401 VALUES [('2024-01-01'), ('2026-01-01')),
+ PARTITION p_202601 VALUES [('2026-01-01'), ('2028-01-01')),
+ PARTITION p_202801 VALUES [('2028-01-01'), ('2028-12-01')))
+ DISTRIBUTED BY HASH(`product_id`, `market_code`, `date`,
`event_type`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'columns', 'k1, k2, k3'
+ file 'test_error_url.csv'
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many
filtered rows"))
+ def (code, out, err) = curl("GET", json.ErrorURL)
+ log.info("error result: " + out)
+ assertTrue(out.contains("actual column number in csv file is
more than schema column number.actual number"))
+ log.info("url: " + json.ErrorURL)
+ }
+ }
+ } finally {
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
index 347cafd499a..820c8a5b09c 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
@@ -440,7 +440,7 @@ suite("test_partial_update_schema_change", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
+ assertEquals(1, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
@@ -1035,7 +1035,7 @@ suite("test_partial_update_schema_change", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
+ assertEquals(1, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
index a9480b664de..3ea3c7a613e 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
@@ -444,7 +444,7 @@ suite("test_partial_update_row_store_schema_change", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
+ assertEquals(1, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
@@ -1045,7 +1045,7 @@ suite("test_partial_update_row_store_schema_change",
"p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
- assertEquals(0, json.NumberTotalRows)
+ assertEquals(1, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]