This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 61de49c727d [case](regression) Test duplicated load id (#28251)
61de49c727d is described below
commit 61de49c727d6cfbf8b690f9dfcbbfd597fd7170b
Author: HowardQin <[email protected]>
AuthorDate: Sat Dec 16 22:41:51 2023 +0800
[case](regression) Test duplicated load id (#28251)
Co-authored-by: qinhao <[email protected]>
---
be/src/runtime/stream_load/new_load_stream_mgr.h | 3 +
.../fault_injection_p0/test_duplicated_load_id.out | 4 ++
.../test_duplicated_load_id.groovy | 75 ++++++++++++++++++++++
3 files changed, 82 insertions(+)
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index 61e4010a2c9..d745fe66a53 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -27,6 +27,7 @@
#include "common/factory_creator.h"
#include "common/logging.h"
#include "common/status.h"
+#include "util/debug_points.h"
#include "util/uid_util.h"
namespace doris {
@@ -44,6 +45,8 @@ public:
Status put(const UniqueId& id, std::shared_ptr<StreamLoadContext> stream) {
{
std::lock_guard<std::mutex> l(_lock);
+ DBUG_EXECUTE_IF("NewLoadStreamMgr.test_duplicated_load_id",
+ _stream_map.emplace(id, stream));
if (auto iter = _stream_map.find(id); iter != _stream_map.end()) {
std::stringstream ss;
ss << "id: " << id << " already exist";
diff --git
a/regression-test/data/fault_injection_p0/test_duplicated_load_id.out
b/regression-test/data/fault_injection_p0/test_duplicated_load_id.out
new file mode 100644
index 00000000000..a21f3b8748e
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_duplicated_load_id.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+0
+
diff --git
a/regression-test/suites/fault_injection_p0/test_duplicated_load_id.groovy
b/regression-test/suites/fault_injection_p0/test_duplicated_load_id.groovy
new file mode 100644
index 00000000000..1d5327fdc4e
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_duplicated_load_id.groovy
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_duplicated_load_id", "nonConcurrent") {
+
+ def create_httplogs_dup_table = {testTablex ->
+ def result = sql """
+ CREATE TABLE IF NOT EXISTS ${testTablex} (
+ `@timestamp` int(11) NULL,
+ `clientip` varchar(20) NULL,
+ `request` text NULL,
+ `status` int(11) NULL,
+ `size` int(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default:
1"
+ );
+ """
+ }
+
+ def testTable = "httplogs"
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_httplogs_dup_table.call(testTable)
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("NewLoadStreamMgr.test_duplicated_load_id")
+ streamLoad {
+ table "${testTable}"
+ // set http request header params
+ set 'label', "test_duplicated_load_id_" +
UUID.randomUUID().toString()
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ file 'documents-1000.json' // import json file
+ time 10000 // limit inflight 10s
+ // if declared a check callback, the default check condition
will ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ assertTrue(json.Message.toLowerCase().contains("already
exist"))
+ }
+ }
+
+ qt_sql "SELECT COUNT(*) FROM ${testTable}"
+
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("NewLoadStreamMgr.test_duplicated_load_id")
+ }
+
+ } finally {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]