This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28292e37c62 [fix](load) fix no error url for stream load (#38325)
28292e37c62 is described below
commit 28292e37c62c3390794517b8b156ae4b34fce795
Author: Xin Liao <[email protected]>
AuthorDate: Thu Jul 25 10:00:26 2024 +0800
[fix](load) fix no error url for stream load (#38325)
Previously, it was determined whether to output an error URL based on
whether num_filtered_rows was greater than 0. However, in abnormal
situations, the statistics of num_filtered_rows may not be accurate,
resulting in no error URL output. As long as there is data in the error
log file indicating filtered rows, the URL can be returned to the client
without the need to determine the number of filtered rows.
---
.../runtime/stream_load/stream_load_executor.cpp | 4 +--
.../load_p0/stream_load/test_stream_load.groovy | 40 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 2bd1c16199d..28b0556aafd 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -90,9 +90,7 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
// some users may rely on this error message.
*status = Status::DataQualityError("too many filtered rows");
}
- if (ctx->number_filtered_rows > 0 &&
!state->get_error_log_file_path().empty()) {
- ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
- }
+ ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
if (status->ok()) {
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 660ad50bd9d..f089861b76e 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -66,6 +66,15 @@ suite("test_stream_load", "p0") {
file 'test_strict_mode.csv'
time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
+ }
}
sql "sync"
@@ -90,6 +99,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
assertEquals(2, json.NumberTotalRows)
assertEquals(1, json.NumberFilteredRows)
}
@@ -117,6 +127,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
assertEquals(2, json.NumberTotalRows)
assertEquals(1, json.NumberFilteredRows)
}
@@ -160,6 +171,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(3, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
@@ -210,6 +222,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(1, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
@@ -428,6 +441,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
sql "sync"
@@ -453,6 +467,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
sql "sync"
@@ -474,6 +489,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(4, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
@@ -497,6 +513,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2500, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
@@ -524,6 +541,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
assertEquals(0, json.NumberLoadedRows)
}
}
@@ -547,6 +565,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2500, json.NumberTotalRows)
assertEquals(2500, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -579,6 +598,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2500, json.NumberTotalRows)
assertEquals(11, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -609,6 +629,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2500, json.NumberTotalRows)
assertEquals(2500, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -639,6 +660,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
assertEquals(2500, json.NumberTotalRows)
assertEquals(0, json.NumberLoadedRows)
assertEquals(2500, json.NumberFilteredRows)
@@ -695,6 +717,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(1025, json.NumberTotalRows)
assertEquals(1025, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -723,6 +746,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(8001, json.NumberTotalRows)
assertEquals(8001, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -754,6 +778,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(6, json.NumberTotalRows)
assertEquals(6, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -782,6 +807,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(5, json.NumberTotalRows)
assertEquals(5, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -834,6 +860,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(10, json.NumberTotalRows)
assertEquals(10, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -884,6 +911,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(5, json.NumberTotalRows)
assertEquals(5, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -936,6 +964,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(14, json.NumberTotalRows)
assertEquals(14, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
@@ -990,6 +1019,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(11, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(5, json.NumberUnselectedRows)
@@ -1050,6 +1080,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
@@ -1092,6 +1123,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
@@ -1233,6 +1265,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
@@ -1262,6 +1295,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
assertEquals(2, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
@@ -1379,6 +1413,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
sql "sync"
@@ -1400,6 +1435,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
sql "sync"
@@ -1456,6 +1492,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
@@ -1473,6 +1510,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
@@ -1492,6 +1530,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
+ assertTrue(!result.contains("ErrorURL"))
}
}
sql "sync"
@@ -1575,6 +1614,7 @@ suite("test_stream_load", "p0") {
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(result.contains("ErrorURL"))
assertTrue(json.Message.contains("Encountered unqualified
data, stop processing"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]