This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c68fab2dd6e9f8febe663cfb0c0b2680026618e7
Author: Xin Liao <[email protected]>
AuthorDate: Thu Jul 25 10:00:26 2024 +0800

    [fix](load) fix no error url for stream load (#38325)
    
    Previously, it was determined whether to output an error URL based on
    whether num_filtered_rows was greater than 0. However, in abnormal
    situations, the statistics of num_filtered_rows may not be accurate,
    resulting in no error URL output. As long as there is data in the error
    log file indicating filtered rows, the URL can be returned to the client
    without the need to determine the number of filtered rows.
---
 .../runtime/stream_load/stream_load_executor.cpp   |  4 +--
 .../load_p0/stream_load/test_stream_load.groovy    | 40 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 2bd1c16199d..28b0556aafd 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -90,9 +90,7 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
             // some users may rely on this error message.
             *status = Status::DataQualityError("too many filtered rows");
         }
-        if (ctx->number_filtered_rows > 0 && 
!state->get_error_log_file_path().empty()) {
-            ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
-        }
+        ctx->error_url = 
to_load_error_http_path(state->get_error_log_file_path());
 
         if (status->ok()) {
             
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
index 660ad50bd9d..f089861b76e 100644
--- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy
@@ -66,6 +66,15 @@ suite("test_stream_load", "p0") {
 
         file 'test_strict_mode.csv'
         time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
+        }
     }
 
     sql "sync"
@@ -90,6 +99,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("fail", json.Status.toLowerCase())
+            assertTrue(result.contains("ErrorURL"))
             assertEquals(2, json.NumberTotalRows)
             assertEquals(1, json.NumberFilteredRows)
         }
@@ -117,6 +127,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(result.contains("ErrorURL"))
             assertEquals(2, json.NumberTotalRows)
             assertEquals(1, json.NumberFilteredRows)
         }
@@ -160,6 +171,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(3, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
         }
@@ -210,6 +222,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(1, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
         }
@@ -428,6 +441,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
     sql "sync"
@@ -453,6 +467,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
     sql "sync"
@@ -474,6 +489,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(4, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
         }
@@ -497,6 +513,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(2500, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
         }
@@ -524,6 +541,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("fail", json.Status.toLowerCase())
+            assertTrue(result.contains("ErrorURL"))
             assertEquals(0, json.NumberLoadedRows)
         }
     }
@@ -547,6 +565,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(2500, json.NumberTotalRows)
             assertEquals(2500, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -579,6 +598,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(2500, json.NumberTotalRows)
             assertEquals(11, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -609,6 +629,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(2500, json.NumberTotalRows)
             assertEquals(2500, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -639,6 +660,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(result.contains("ErrorURL"))
             assertEquals(2500, json.NumberTotalRows)
             assertEquals(0, json.NumberLoadedRows)
             assertEquals(2500, json.NumberFilteredRows)
@@ -695,6 +717,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(1025, json.NumberTotalRows)
             assertEquals(1025, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -723,6 +746,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(8001, json.NumberTotalRows)
             assertEquals(8001, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -754,6 +778,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(6, json.NumberTotalRows)
             assertEquals(6, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -782,6 +807,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(5, json.NumberTotalRows)
             assertEquals(5, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -834,6 +860,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(10, json.NumberTotalRows)
             assertEquals(10, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -884,6 +911,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(5, json.NumberTotalRows)
             assertEquals(5, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -936,6 +964,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(14, json.NumberTotalRows)
             assertEquals(14, json.NumberLoadedRows)
             assertEquals(0, json.NumberFilteredRows)
@@ -990,6 +1019,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(11, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(5, json.NumberUnselectedRows)
@@ -1050,6 +1080,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(2, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(0, json.NumberUnselectedRows)
@@ -1092,6 +1123,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
             assertEquals(2, json.NumberTotalRows)
             assertEquals(0, json.NumberFilteredRows)
             assertEquals(0, json.NumberUnselectedRows)
@@ -1233,6 +1265,7 @@ suite("test_stream_load", "p0") {
                 log.info("Stream load result: ${result}".toString())
                 def json = parseJson(result)
                 assertEquals("success", json.Status.toLowerCase())
+                assertTrue(!result.contains("ErrorURL"))
                 assertEquals(2, json.NumberTotalRows)
                 assertEquals(0, json.NumberFilteredRows)
                 assertEquals(0, json.NumberUnselectedRows)
@@ -1262,6 +1295,7 @@ suite("test_stream_load", "p0") {
                 log.info("Stream load result: ${result}".toString())
                 def json = parseJson(result)
                 assertEquals("success", json.Status.toLowerCase())
+                assertTrue(!result.contains("ErrorURL"))
                 assertEquals(2, json.NumberTotalRows)
                 assertEquals(0, json.NumberFilteredRows)
                 assertEquals(0, json.NumberUnselectedRows)
@@ -1379,6 +1413,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
     sql "sync"
@@ -1400,6 +1435,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
     sql "sync"
@@ -1456,6 +1492,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
 
@@ -1473,6 +1510,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
 
@@ -1492,6 +1530,7 @@ suite("test_stream_load", "p0") {
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
             assertEquals("success", json.Status.toLowerCase())
+            assertTrue(!result.contains("ErrorURL"))
         }
     }
     sql "sync"
@@ -1575,6 +1614,7 @@ suite("test_stream_load", "p0") {
                 log.info("Stream load result: ${result}".toString())
                 def json = parseJson(result)
                 assertEquals("fail", json.Status.toLowerCase())
+                assertTrue(result.contains("ErrorURL"))
                 assertTrue(json.Message.contains("Encountered unqualified 
data, stop processing"))
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to