This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cbbc15e3347 [fix](cloud) fix be core when using stream load record in
cloud mode (#37903)
cbbc15e3347 is described below
commit cbbc15e33475c62d7b690999f2e973173cb01606
Author: hui lai <[email protected]>
AuthorDate: Sat Jul 20 20:59:41 2024 +0800
[fix](cloud) fix be core when using stream load record in cloud mode
(#37903)
---
be/src/cloud/cloud_backend_service.cpp | 8 +++
be/src/cloud/cloud_backend_service.h | 3 +
be/src/cloud/cloud_storage_engine.cpp | 4 ++
be/src/http/action/http_stream.cpp | 8 ++-
be/src/http/action/stream_load.cpp | 8 ++-
be/src/olap/storage_engine.cpp | 41 ++++++------
be/src/olap/storage_engine.h | 16 ++---
be/src/service/backend_service.cpp | 48 ++++++++------
be/src/service/backend_service.h | 4 ++
.../org/apache/doris/load/StreamLoadRecordMgr.java | 28 ++++----
.../pipeline/cloud_p0/conf/be_custom.conf | 2 +
regression-test/pipeline/p0/conf/be.conf | 1 +
.../stream_load/test_stream_load_record.groovy | 76 ++++++++++++++++++++++
13 files changed, 178 insertions(+), 69 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index f576b60045d..d91e9e416b8 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -29,6 +29,8 @@
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_recorder.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/thrift_server.h"
@@ -186,4 +188,10 @@ void
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
response.status = t_status;
}
+void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult&
result,
+ int64_t
last_stream_record_time) {
+ BaseBackendService::get_stream_load_record(result, last_stream_record_time,
+
_engine.get_stream_load_recorder());
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_backend_service.h
b/be/src/cloud/cloud_backend_service.h
index 88f0099fe73..358cb4d1f0b 100644
--- a/be/src/cloud/cloud_backend_service.h
+++ b/be/src/cloud/cloud_backend_service.h
@@ -53,6 +53,9 @@ public:
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest&
request) override;
+ void get_stream_load_record(TStreamLoadRecordResult& result,
+ int64_t last_stream_record_time) override;
+
private:
CloudStorageEngine& _engine;
};
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index b1b455d2007..de4bbac7b3e 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -196,6 +196,10 @@ Status CloudStorageEngine::open() {
_tablet_hotspot = std::make_unique<TabletHotspot>();
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
+ "init StreamLoadRecorder failed");
+
return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index a3439969e60..87cc2f694eb 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -30,6 +30,7 @@
#include <rapidjson/prettywriter.h>
#include <thrift/protocol/TDebugProtocol.h>
+#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
@@ -119,7 +120,7 @@ void HttpStreamAction::handle(HttpRequest* req) {
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
- if (config::enable_stream_load_record && !config::is_cloud_mode()) {
+ if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
@@ -364,8 +365,9 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
void
HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext>
ctx,
const std::string& str) {
- auto stream_load_recorder =
-
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
+ std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
+
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
+
if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) +
"_" + ctx->label;
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 93fde511898..2b6a0803e81 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -39,6 +39,7 @@
#include <stdexcept>
#include <utility>
+#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
@@ -217,7 +218,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
- if (config::enable_stream_load_record && !config::is_cloud_mode()) {
+ if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
@@ -705,8 +706,9 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req,
std::string* file_pa
void
StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext>
ctx,
const std::string& str) {
- auto stream_load_recorder =
-
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
+ std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
+
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
+
if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) +
"_" + ctx->label;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 43093d3183e..f9fe26bb934 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -134,6 +134,25 @@ int64_t
BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
config::memory_limitation_per_thread_for_schema_change_bytes);
}
+Status BaseStorageEngine::init_stream_load_recorder(const std::string&
stream_load_record_path) {
+ LOG(INFO) << "stream load record path: " << stream_load_record_path;
+ // init stream load record rocksdb
+ _stream_load_recorder =
StreamLoadRecorder::create_shared(stream_load_record_path);
+ if (_stream_load_recorder == nullptr) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::MemoryAllocFailed("allocate memory for
StreamLoadRecorder failed"),
+ "new StreamLoadRecorder failed");
+ }
+ auto st = _stream_load_recorder->init();
+ if (!st.ok()) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::IOError("open StreamLoadRecorder rocksdb failed,
path={}",
+ stream_load_record_path),
+ "init StreamLoadRecorder failed");
+ }
+ return Status::OK();
+}
+
static Status _validate_options(const EngineOptions& options) {
if (options.store_paths.empty()) {
return Status::InternalError("store paths is empty");
@@ -158,7 +177,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_tablet_manager(new TabletManager(*this,
config::tablet_map_shard_size)),
_txn_manager(new TxnManager(*this, config::txn_map_shard_size,
config::txn_shard_size)),
_default_rowset_type(BETA_ROWSET),
- _stream_load_recorder(nullptr),
_create_tablet_idx_lru_cache(
new
CreateTabletIdxCache(config::partition_disk_index_lru_size)),
_snapshot_mgr(std::make_unique<SnapshotManager>(*this)) {
@@ -274,31 +292,12 @@ Status StorageEngine::_init_store_map() {
return Status::InternalError("init path failed, error={}", error_msg);
}
-
RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(_options.store_paths[0].path),
+
RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path),
"init StreamLoadRecorder failed");
return Status::OK();
}
-Status StorageEngine::_init_stream_load_recorder(const std::string&
stream_load_record_path) {
- LOG(INFO) << "stream load record path: " << stream_load_record_path;
- // init stream load record rocksdb
- _stream_load_recorder =
StreamLoadRecorder::create_shared(stream_load_record_path);
- if (_stream_load_recorder == nullptr) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::MemoryAllocFailed("allocate memory for
StreamLoadRecorder failed"),
- "new StreamLoadRecorder failed");
- }
- auto st = _stream_load_recorder->init();
- if (!st.ok()) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError("open StreamLoadRecorder rocksdb failed,
path={}",
- stream_load_record_path),
- "init StreamLoadRecorder failed");
- }
- return Status::OK();
-}
-
void StorageEngine::_update_storage_medium_type_count() {
set<TStorageMedium::type> available_storage_medium_types;
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6e8fb7bbb7f..b1f30e5db8c 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -133,6 +133,12 @@ public:
int get_disk_num() { return _disk_num; }
+ Status init_stream_load_recorder(const std::string&
stream_load_record_path);
+
+ const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
+ return _stream_load_recorder;
+ }
+
protected:
void _evict_querying_rowset();
void _evict_quring_rowset_thread_callback();
@@ -157,6 +163,8 @@ protected:
int64_t _memory_limitation_bytes_for_schema_change;
int _disk_num {-1};
+
+ std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
};
class StorageEngine final : public BaseStorageEngine {
@@ -246,10 +254,6 @@ public:
bool should_fetch_from_peer(int64_t tablet_id);
- const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
- return _stream_load_recorder;
- }
-
Status get_compaction_status_json(std::string* result);
// check cumulative compaction config
@@ -349,8 +353,6 @@ private:
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
CompactionType compaction_type);
- Status _init_stream_load_recorder(const std::string&
stream_load_record_path);
-
Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType
compaction_type,
bool force);
@@ -470,8 +472,6 @@ private:
std::mutex _compaction_producer_sleep_mutex;
std::condition_variable _compaction_producer_sleep_cv;
- std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
-
// we use unordered_map to store all cumulative compaction policy sharded
ptr
std::unordered_map<std::string_view,
std::shared_ptr<CumulativeCompactionPolicy>>
_cumulative_compaction_policies;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 4effc225110..d686c12609a 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -923,27 +923,8 @@ void BaseBackendService::close_scanner(TScanCloseResult&
result_, const TScanClo
void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
- auto stream_load_recorder = _engine.get_stream_load_recorder();
- if (stream_load_recorder != nullptr) {
- std::map<std::string, std::string> records;
- auto st =
stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
-
config::stream_load_record_batch_size, &records);
- if (st.ok()) {
- LOG(INFO) << "get_batch stream_load_record rocksdb successfully.
records size: "
- << records.size()
- << ", last_stream_load_timestamp: " <<
last_stream_record_time;
- std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
- auto it = records.begin();
- for (; it != records.end(); ++it) {
- TStreamLoadRecord stream_load_item;
- StreamLoadContext::parse_stream_load_record(it->second,
stream_load_item);
- stream_load_record_batch.emplace(it->first.c_str(),
stream_load_item);
- }
- result.__set_stream_load_record(stream_load_record_batch);
- }
- } else {
- LOG(WARNING) << "stream_load_recorder is null.";
- }
+ BaseBackendService::get_stream_load_record(result, last_stream_record_time,
+
_engine.get_stream_load_recorder());
}
void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
@@ -1199,6 +1180,31 @@ void
BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
LOG(ERROR) << "get_stream_load_record is not implemented";
}
+void BaseBackendService::get_stream_load_record(
+ TStreamLoadRecordResult& result, int64_t last_stream_record_time,
+ std::shared_ptr<StreamLoadRecorder> stream_load_recorder) {
+ if (stream_load_recorder != nullptr) {
+ std::map<std::string, std::string> records;
+ auto st =
stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
+
config::stream_load_record_batch_size, &records);
+ if (st.ok()) {
+ LOG(INFO) << "get_batch stream_load_record rocksdb successfully.
records size: "
+ << records.size()
+ << ", last_stream_load_timestamp: " <<
last_stream_record_time;
+ std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
+ auto it = records.begin();
+ for (; it != records.end(); ++it) {
+ TStreamLoadRecord stream_load_item;
+ StreamLoadContext::parse_stream_load_record(it->second,
stream_load_item);
+ stream_load_record_batch.emplace(it->first.c_str(),
stream_load_item);
+ }
+ result.__set_stream_load_record(stream_load_record_batch);
+ }
+ } else {
+ LOG(WARNING) << "stream_load_recorder is null.";
+ }
+}
+
void
BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>&
diskTrashInfos) {
LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
}
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 0ada1bf5393..f0e06094560 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -26,6 +26,7 @@
#include "agent/agent_server.h"
#include "agent/topic_subscriber.h"
#include "common/status.h"
+#include "runtime/stream_load/stream_load_recorder.h"
namespace doris {
@@ -165,6 +166,9 @@ public:
protected:
Status start_plan_fragment_execution(const TExecPlanFragmentParams&
exec_params);
+ void get_stream_load_record(TStreamLoadRecordResult& result, int64_t
last_stream_record_time,
+ std::shared_ptr<StreamLoadRecorder>
stream_load_recorder);
+
ExecEnv* _exec_env = nullptr;
std::unique_ptr<AgentServer> _agent_server;
std::unique_ptr<ThreadPool> _ingest_binlog_workers;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
index f44e8b785f6..6c53f354af8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
@@ -150,16 +150,19 @@ public class StreamLoadRecordMgr extends MasterDaemon {
}
public List<StreamLoadItem> getStreamLoadRecords() {
+ LOG.info("test log: {}", streamLoadRecordHeap);
return new ArrayList<>(streamLoadRecordHeap);
}
public List<List<Comparable>> getStreamLoadRecordByDb(
long dbId, String label, boolean accurateMatch, StreamLoadState
state) {
LinkedList<List<Comparable>> streamLoadRecords = new
LinkedList<List<Comparable>>();
+ LOG.info("test log: {}", dbId);
readLock();
try {
if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
+ LOG.info("test log: {}", dbId);
return streamLoadRecords;
}
@@ -202,6 +205,7 @@ public class StreamLoadRecordMgr extends MasterDaemon {
}
}
+ LOG.info("test log: {}", streamLoadRecords);
return streamLoadRecords;
} finally {
readUnlock();
@@ -263,19 +267,17 @@ public class StreamLoadRecordMgr extends MasterDaemon {
TimeUtils.getDatetimeMsFormatWithTimeZone());
String finishTime =
TimeUtils.longToTimeString(streamLoadItem.getFinishTime(),
TimeUtils.getDatetimeMsFormatWithTimeZone());
- if (LOG.isDebugEnabled()) {
- LOG.debug("receive stream load record info from
backend: {}."
- + " label: {}, db: {}, tbl: {}, user:
{}, user_ip: {},"
- + " status: {}, message: {},
error_url: {},"
- + " total_rows: {}, loaded_rows: {},
filtered_rows: {}, unselected_rows: {},"
- + " load_bytes: {}, start_time: {},
finish_time: {}.",
- backend.getHost(), streamLoadItem.getLabel(),
streamLoadItem.getDb(),
- streamLoadItem.getTbl(),
streamLoadItem.getUser(), streamLoadItem.getUserIp(),
- streamLoadItem.getStatus(),
streamLoadItem.getMessage(), streamLoadItem.getUrl(),
- streamLoadItem.getTotalRows(),
streamLoadItem.getLoadedRows(),
- streamLoadItem.getFilteredRows(),
streamLoadItem.getUnselectedRows(),
- streamLoadItem.getLoadBytes(), startTime,
finishTime);
- }
+ LOG.info("receive stream load record info from backend:
{}."
+ + " label: {}, db: {}, tbl: {}, user: {},
user_ip: {},"
+ + " status: {}, message: {}, error_url:
{},"
+ + " total_rows: {}, loaded_rows: {},
filtered_rows: {}, unselected_rows: {},"
+ + " load_bytes: {}, start_time: {},
finish_time: {}.",
+ backend.getHost(), streamLoadItem.getLabel(),
streamLoadItem.getDb(),
+ streamLoadItem.getTbl(), streamLoadItem.getUser(),
streamLoadItem.getUserIp(),
+ streamLoadItem.getStatus(),
streamLoadItem.getMessage(), streamLoadItem.getUrl(),
+ streamLoadItem.getTotalRows(),
streamLoadItem.getLoadedRows(),
+ streamLoadItem.getFilteredRows(),
streamLoadItem.getUnselectedRows(),
+ streamLoadItem.getLoadBytes(), startTime,
finishTime);
AuditEvent auditEvent =
new
StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index a2478b47269..9f2967b1972 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -30,4 +30,6 @@ file_cache_path =
[{"path":"/data/doris_cloud/file_cache","total_size":104857600
tmp_file_dirs =
[{"path":"/data/doris_cloud/tmp","max_cache_bytes":104857600,"max_upload_bytes":104857600}]
thrift_rpc_timeout_ms = 360000
save_load_error_log_to_s3 = true
+enable_stream_load_record = true
+stream_load_record_batch_size = 500
webserver_num_workers = 128
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index 32c7b81f934..a072ac7ad50 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -47,6 +47,7 @@ max_garbage_sweep_interval=180
log_buffer_level = -1
enable_stream_load_record = true
+stream_load_record_batch_size = 500
storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1;/mnt/ssd01/cluster_storage/doris.SSD
disable_auto_compaction=true
priority_networks=172.19.0.0/24
diff --git
a/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy
b/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy
new file mode 100644
index 00000000000..96a4fff9c53
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_stream_load_record.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_record", "p0") {
+ def tableName = "test_stream_load_record"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+ `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+ `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+ `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+ `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+ `v8` datetime REPLACE_IF_NOT_NULL NULL,
+ `v9` date REPLACE_IF_NOT_NULL NULL,
+ `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+ `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+ `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`k1`)
+ (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+ PARTITION partition_b VALUES [("100000"), ("1000000000")),
+ PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+ PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // test strict_mode success
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+ set 'strict_mode', 'true'
+
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql"show stream load"
+ log.info("Stream load result: ${res}", res)
+ if (res.size() > 0) {
+ break
+ }
+ if (count > 150) {
+ assertTrue(-1 > 0)
+ }
+ count++
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]