This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4fcd6cd236 [refactor](remove unused code) remove load stream mgr
(#16580)
4fcd6cd236 is described below
commit 4fcd6cd2365b038a5ef3830003d9ca28af960237
Author: yiguolei <[email protected]>
AuthorDate: Fri Feb 10 07:46:18 2023 +0800
[refactor](remove unused code) remove load stream mgr (#16580)
remove old stream load pipe
remove old stream load manager
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/http/action/stream_load.cpp | 2 +-
be/src/io/file_factory.cpp | 10 -
be/src/io/file_factory.h | 4 -
be/src/runtime/CMakeLists.txt | 1 -
be/src/runtime/exec_env.h | 3 -
be/src/runtime/exec_env_init.cpp | 3 -
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/routine_load/kafka_consumer_pipe.h | 54 -----
be/src/runtime/stream_load/load_stream_mgr.cpp | 36 ---
be/src/runtime/stream_load/load_stream_mgr.h | 72 ------
be/src/runtime/stream_load/new_load_stream_mgr.h | 1 -
be/src/runtime/stream_load/stream_load_context.h | 1 -
be/src/runtime/stream_load/stream_load_pipe.h | 282 ----------------------
be/test/CMakeLists.txt | 3 -
be/test/http/stream_load_test.cpp | 234 ------------------
be/test/runtime/kafka_consumer_pipe_test.cpp | 66 -----
be/test/runtime/stream_load_pipe_test.cpp | 261 --------------------
be/test/vec/exec/vtablet_sink_test.cpp | 3 -
18 files changed, 2 insertions(+), 1036 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 8281064881..a55c0e22c2 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -397,7 +397,7 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req, StreamLoadContext*
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe = std::make_shared<io::StreamLoadPipe>(
- kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /*
min_chunk_size */,
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
ctx->body_bytes /* total_length */);
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 42b14f3572..9324a83f27 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -37,7 +37,6 @@
#include "io/s3_writer.h"
#include "olap/iterators.h"
#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
namespace doris {
@@ -198,15 +197,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId&
load_id, io::FileReaderS
return Status::OK();
}
-Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
- std::shared_ptr<FileReader>&
file_reader) {
- file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id);
- if (!file_reader) {
- return Status::InternalError("unknown stream load id: {}",
UniqueId(load_id).to_string());
- }
- return Status::OK();
-}
-
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
std::shared_ptr<io::FileSystem>*
hdfs_file_system,
io::FileReaderSPtr* reader,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 63ab0f2a83..c78504f9c4 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -80,10 +80,6 @@ public:
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id,
io::FileReaderSPtr* file_reader);
- // [deprecated] Create FileReader for stream load pipe
- static Status create_pipe_reader(const TUniqueId& load_id,
- std::shared_ptr<FileReader>& file_reader);
-
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const
std::string& path,
std::shared_ptr<io::FileSystem>*
hdfs_file_system,
io::FileReaderSPtr* reader,
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 94b95a6fe6..a768c16a55 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -59,7 +59,6 @@ set(RUNTIME_FILES
stream_load/stream_load_context.cpp
stream_load/stream_load_executor.cpp
stream_load/stream_load_recorder.cpp
- stream_load/load_stream_mgr.cpp
stream_load/new_load_stream_mgr.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index b646f67589..4d2163cf39 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -47,7 +47,6 @@ class ExternalScanContextMgr;
class FragmentMgr;
class ResultCache;
class LoadPathMgr;
-class LoadStreamMgr;
class NewLoadStreamMgr;
class MemTrackerLimiter;
class MemTracker;
@@ -156,7 +155,6 @@ public:
return _function_client_cache;
}
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
- LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
@@ -230,7 +228,6 @@ private:
BfdParser* _bfd_parser = nullptr;
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
- LoadStreamMgr* _load_stream_mgr = nullptr;
NewLoadStreamMgr* _new_load_stream_mgr = nullptr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 62330854bb..8b3146cdc2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -42,7 +42,6 @@
#include "runtime/result_queue_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"
#include "runtime/small_file_mgr.h"
-#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/tmp_file_mgr.h"
@@ -116,7 +115,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_bfd_parser = BfdParser::create();
_broker_mgr = new BrokerMgr(this);
_load_channel_mgr = new LoadChannelMgr();
- _load_stream_mgr = new LoadStreamMgr();
_new_load_stream_mgr = new NewLoadStreamMgr();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
@@ -313,7 +311,6 @@ void ExecEnv::_destroy() {
_deregister_metrics();
SAFE_DELETE(_internal_client_cache);
SAFE_DELETE(_function_client_cache);
- SAFE_DELETE(_load_stream_mgr);
SAFE_DELETE(_load_channel_mgr);
SAFE_DELETE(_broker_mgr);
SAFE_DELETE(_bfd_parser);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e1bd3651db..d705ff1141 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -506,7 +506,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params) {
stream_load_ctx->need_commit_self = true;
stream_load_ctx->need_rollback = true;
auto pipe = std::make_shared<io::StreamLoadPipe>(
- kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /*
min_chunk_size */,
+ io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h
b/be/src/runtime/routine_load/kafka_consumer_pipe.h
deleted file mode 100644
index 6d01bbe091..0000000000
--- a/be/src/runtime/routine_load/kafka_consumer_pipe.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <map>
-#include <string>
-#include <vector>
-
-#include "io/file_reader.h"
-#include "librdkafka/rdkafka.h"
-#include "runtime/message_body_sink.h"
-#include "runtime/stream_load/stream_load_pipe.h"
-
-namespace doris {
-
-class KafkaConsumerPipe : public StreamLoadPipe {
-public:
- KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t
min_chunk_size = 64 * 1024)
- : StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}
-
- virtual ~KafkaConsumerPipe() {}
-
- Status append_with_line_delimiter(const char* data, size_t size) {
- Status st = append(data, size);
- if (!st.ok()) {
- return st;
- }
-
- // append the line delimiter
- st = append("\n", 1);
- return st;
- }
-
- Status append_json(const char* data, size_t size) { return
append_and_flush(data, size); }
-};
-
-} // end namespace doris
diff --git a/be/src/runtime/stream_load/load_stream_mgr.cpp
b/be/src/runtime/stream_load/load_stream_mgr.cpp
deleted file mode 100644
index c486dccd8d..0000000000
--- a/be/src/runtime/stream_load/load_stream_mgr.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/stream_load/load_stream_mgr.h"
-
-namespace doris {
-
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(stream_load_pipe_count, MetricUnit::NOUNIT);
-
-LoadStreamMgr::LoadStreamMgr() {
- // Each StreamLoadPipe has a limited buffer size (default 1M), it's not
needed to count the
- // actual size of all StreamLoadPipe.
- REGISTER_HOOK_METRIC(stream_load_pipe_count, [this]() {
- // std::lock_guard<std::mutex> l(_lock);
- return _stream_map.size();
- });
-}
-
-LoadStreamMgr::~LoadStreamMgr() {
- DEREGISTER_HOOK_METRIC(stream_load_pipe_count);
-}
-} // namespace doris
diff --git a/be/src/runtime/stream_load/load_stream_mgr.h
b/be/src/runtime/stream_load/load_stream_mgr.h
deleted file mode 100644
index b374fe0c32..0000000000
--- a/be/src/runtime/stream_load/load_stream_mgr.h
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-
-#include "runtime/stream_load/stream_load_pipe.h" // for StreamLoadPipe
-#include "util/doris_metrics.h"
-#include "util/uid_util.h" // for std::hash for UniqueId
-
-namespace doris {
-
-// used to register all streams in process so that other module can get this
stream
-class LoadStreamMgr {
-public:
- LoadStreamMgr();
- ~LoadStreamMgr();
-
- Status put(const UniqueId& id, std::shared_ptr<StreamLoadPipe> stream) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _stream_map.find(id);
- if (it != std::end(_stream_map)) {
- return Status::InternalError("id already exist");
- }
- _stream_map.emplace(id, stream);
- VLOG_NOTICE << "put stream load pipe: " << id;
- return Status::OK();
- }
-
- std::shared_ptr<StreamLoadPipe> get(const UniqueId& id) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _stream_map.find(id);
- if (it == std::end(_stream_map)) {
- return nullptr;
- }
- auto stream = it->second;
- _stream_map.erase(it);
- return stream;
- }
-
- void remove(const UniqueId& id) {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _stream_map.find(id);
- if (it != std::end(_stream_map)) {
- _stream_map.erase(it);
- VLOG_NOTICE << "remove stream load pipe: " << id;
- }
- }
-
-private:
- std::mutex _lock;
- std::unordered_map<UniqueId, std::shared_ptr<StreamLoadPipe>> _stream_map;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index 60cd3aa446..9ab2030487 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -28,7 +28,6 @@
namespace doris {
// used to register all streams in process so that other module can get this
stream
-// TODO(ftw): should be renamed to `LoadStreamMgr` after new file reader is
ready.
class NewLoadStreamMgr {
public:
NewLoadStreamMgr();
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 0fc27a27f9..ef3602ab8f 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -28,7 +28,6 @@
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "service/backend_options.h"
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h
b/be/src/runtime/stream_load/stream_load_pipe.h
deleted file mode 100644
index f32048da54..0000000000
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ /dev/null
@@ -1,282 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <condition_variable>
-#include <deque>
-#include <mutex>
-
-#include "gen_cpp/internal_service.pb.h"
-#include "io/file_reader.h"
-#include "runtime/message_body_sink.h"
-#include "runtime/thread_context.h"
-#include "util/bit_util.h"
-#include "util/byte_buffer.h"
-
-namespace doris {
-
-const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
-// StreamLoadPipe use to transfer data from producer to consumer
-// Data in pip is stored in chunks.
-
-class StreamLoadPipe : public MessageBodySink, public FileReader {
-public:
- StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
- size_t min_chunk_size = 64 * 1024, int64_t total_length =
-1,
- bool use_proto = false)
- : _buffered_bytes(0),
- _proto_buffered_bytes(0),
- _max_buffered_bytes(max_buffered_bytes),
- _min_chunk_size(min_chunk_size),
- _total_length(total_length),
- _use_proto(use_proto) {}
-
- ~StreamLoadPipe() override {
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
- while (!_buf_queue.empty()) {
- _buf_queue.pop_front();
- }
- }
-
- Status open() override { return Status::OK(); }
-
- Status append_and_flush(const char* data, size_t size, size_t
proto_byte_size = 0) {
- ByteBufferPtr buf =
ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
- buf->put_bytes(data, size);
- buf->flip();
- return _append(buf, proto_byte_size);
- }
-
- Status append(const char* data, size_t size) override {
- size_t pos = 0;
- if (_write_buf != nullptr) {
- if (size < _write_buf->remaining()) {
- _write_buf->put_bytes(data, size);
- return Status::OK();
- } else {
- pos = _write_buf->remaining();
- _write_buf->put_bytes(data, pos);
-
- _write_buf->flip();
- RETURN_IF_ERROR(_append(_write_buf));
- _write_buf.reset();
- }
- }
- // need to allocate a new chunk, min chunk is 64k
- size_t chunk_size = std::max(_min_chunk_size, size - pos);
- chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
- _write_buf = ByteBuffer::allocate(chunk_size);
- _write_buf->put_bytes(data + pos, size - pos);
- return Status::OK();
- }
-
- Status append(const ByteBufferPtr& buf) override {
- if (_write_buf != nullptr) {
- _write_buf->flip();
- RETURN_IF_ERROR(_append(_write_buf));
- _write_buf.reset();
- }
- return _append(buf);
- }
-
- // If _total_length == -1, this should be a Kafka routine load task,
- // just get the next buffer directly from the buffer queue, because one
buffer contains a complete piece of data.
- // Otherwise, this should be a stream load task that needs to read the
specified amount of data.
- Status read_one_message(std::unique_ptr<uint8_t[]>* data, int64_t* length)
override {
- if (_total_length < -1) {
- return Status::InternalError("invalid, _total_length is: {}",
_total_length);
- } else if (_total_length == 0) {
- // no data
- *length = 0;
- return Status::OK();
- }
-
- if (_total_length == -1) {
- return _read_next_buffer(data, length);
- }
-
- // _total_length > 0, read the entire data
- data->reset(new uint8_t[_total_length]);
- bool eof = false;
- Status st = read(data->get(), _total_length, length, &eof);
- if (eof) {
- *length = 0;
- }
- return st;
- }
-
- Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool*
eof) override {
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
- *bytes_read = 0;
- while (*bytes_read < data_size) {
- std::unique_lock<std::mutex> l(_lock);
- while (!_cancelled && !_finished && _buf_queue.empty()) {
- _get_cond.wait(l);
- }
- // cancelled
- if (_cancelled) {
- return Status::InternalError("cancelled: {}",
_cancelled_reason);
- }
- // finished
- if (_buf_queue.empty()) {
- DCHECK(_finished);
- data_size = *bytes_read;
- *eof = (*bytes_read == 0);
- return Status::OK();
- }
- auto buf = _buf_queue.front();
- int64_t copy_size = std::min(data_size - *bytes_read,
(int64_t)buf->remaining());
- buf->get_bytes((char*)data + *bytes_read, copy_size);
- *bytes_read += copy_size;
- if (!buf->has_remaining()) {
- _buf_queue.pop_front();
- _buffered_bytes -= buf->limit;
- _put_cond.notify_one();
- }
- }
- DCHECK(*bytes_read == data_size)
- << "*bytes_read=" << *bytes_read << ", data_size=" <<
data_size;
- *eof = false;
- return Status::OK();
- }
-
- Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void*
out) override {
- return Status::InternalError("Not implemented");
- }
-
- int64_t size() override { return 0; }
-
- Status seek(int64_t position) override { return Status::InternalError("Not
implemented"); }
-
- Status tell(int64_t* position) override { return
Status::InternalError("Not implemented"); }
-
- // called when consumer finished
- void close() override { cancel("closed"); }
-
- bool closed() override { return _cancelled; }
-
- // called when producer finished
- Status finish() override {
- if (_write_buf != nullptr) {
- _write_buf->flip();
- _append(_write_buf);
- _write_buf.reset();
- }
- {
- std::lock_guard<std::mutex> l(_lock);
- _finished = true;
- }
- _get_cond.notify_all();
- return Status::OK();
- }
-
- // called when producer/consumer failed
- void cancel(const std::string& reason) override {
- {
- std::lock_guard<std::mutex> l(_lock);
- _cancelled = true;
- _cancelled_reason = reason;
- }
- _get_cond.notify_all();
- _put_cond.notify_all();
- }
-
-private:
- // read the next buffer from _buf_queue
- Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t*
length) {
- std::unique_lock<std::mutex> l(_lock);
- while (!_cancelled && !_finished && _buf_queue.empty()) {
- _get_cond.wait(l);
- }
- // cancelled
- if (_cancelled) {
- return Status::InternalError("cancelled: {}", _cancelled_reason);
- }
- // finished
- if (_buf_queue.empty()) {
- DCHECK(_finished);
- data->reset();
- *length = 0;
- return Status::OK();
- }
- auto buf = _buf_queue.front();
- *length = buf->remaining();
- data->reset(new uint8_t[*length]);
- buf->get_bytes((char*)(data->get()), *length);
- _buf_queue.pop_front();
- _buffered_bytes -= buf->limit;
- if (_use_proto) {
- PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
- _proto_buffered_bytes -= (sizeof(PDataRow*) +
(*ptr)->GetCachedSize());
- }
- _put_cond.notify_one();
- return Status::OK();
- }
-
- Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) {
- {
- std::unique_lock<std::mutex> l(_lock);
- // if _buf_queue is empty, we append this buf without size check
- if (_use_proto) {
- while (!_cancelled && !_buf_queue.empty() &&
- (_proto_buffered_bytes + proto_byte_size >
_max_buffered_bytes)) {
- _put_cond.wait(l);
- }
- } else {
- while (!_cancelled && !_buf_queue.empty() &&
- _buffered_bytes + buf->remaining() >
_max_buffered_bytes) {
- _put_cond.wait(l);
- }
- }
- if (_cancelled) {
- return Status::InternalError("cancelled: {}",
_cancelled_reason);
- }
- _buf_queue.push_back(buf);
- if (_use_proto) {
- _proto_buffered_bytes += proto_byte_size;
- } else {
- _buffered_bytes += buf->remaining();
- }
- }
- _get_cond.notify_one();
- return Status::OK();
- }
-
- // Blocking queue
- std::mutex _lock;
- size_t _buffered_bytes;
- size_t _proto_buffered_bytes;
- size_t _max_buffered_bytes;
- size_t _min_chunk_size;
- // The total amount of data expected to be read.
- // In some scenarios, such as loading json format data through stream load,
- // the data needs to be completely read before it can be parsed,
- // so the total size of the data needs to be known.
- // The default is -1, which means that the data arrives in a stream
- // and the length is unknown.
- // size_t is unsigned, so use int64_t
- int64_t _total_length = -1;
- bool _use_proto = false;
- std::deque<ByteBufferPtr> _buf_queue;
- std::condition_variable _put_cond;
- std::condition_variable _get_cond;
-
- ByteBufferPtr _write_buf;
-};
-
-} // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 444416bce3..7fa7493bf6 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -66,7 +66,6 @@ set(HTTP_TEST_FILES
http/http_utils_test.cpp
http/http_client_test.cpp
# TODO this will overide HttpChannel and make other test failed
- # http/stream_load_test.cpp
# http/metrics_action_test.cpp
)
set(IO_TEST_FILES
@@ -145,10 +144,8 @@ set(RUNTIME_TEST_FILES
runtime/string_value_test.cpp
runtime/fragment_mgr_test.cpp
runtime/mem_limit_test.cpp
- runtime/stream_load_pipe_test.cpp
runtime/snapshot_loader_test.cpp
runtime/user_function_cache_test.cpp
- runtime/kafka_consumer_pipe_test.cpp
runtime/routine_load_task_executor_test.cpp
runtime/small_file_mgr_test.cpp
runtime/heartbeat_flags_test.cpp
diff --git a/be/test/http/stream_load_test.cpp
b/be/test/http/stream_load_test.cpp
deleted file mode 100644
index 1609eedf80..0000000000
--- a/be/test/http/stream_load_test.cpp
+++ /dev/null
@@ -1,234 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "http/action/stream_load.h"
-
-#include <event2/http.h>
-#include <event2/http_struct.h>
-#include <gtest/gtest.h>
-#include <rapidjson/document.h>
-
-#include "exec/schema_scanner/schema_helper.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "http/http_channel.h"
-#include "http/http_request.h"
-#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
-#include "runtime/stream_load/stream_load_executor.h"
-#include "util/brpc_client_cache.h"
-#include "util/cpu_info.h"
-
-struct mg_connection;
-
-namespace doris {
-
-std::string k_response_str;
-
-// Send Unauthorized status with basic challenge
-void HttpChannel::send_basic_challenge(HttpRequest* req, const std::string&
realm) {}
-
-void HttpChannel::send_error(HttpRequest* request, HttpStatus status) {}
-
-void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) {}
-
-void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const
std::string& content) {
- k_response_str = content;
-}
-
-void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t
size) {}
-
-extern TLoadTxnBeginResult k_stream_load_begin_result;
-extern TLoadTxnCommitResult k_stream_load_commit_result;
-extern TLoadTxnRollbackResult k_stream_load_rollback_result;
-extern TStreamLoadPutResult k_stream_load_put_result;
-extern Status k_stream_load_plan_status;
-
-class StreamLoadActionTest : public testing::Test {
-public:
- StreamLoadActionTest() {}
- virtual ~StreamLoadActionTest() {}
- void SetUp() override {
- k_stream_load_begin_result = TLoadTxnBeginResult();
- k_stream_load_commit_result = TLoadTxnCommitResult();
- k_stream_load_rollback_result = TLoadTxnRollbackResult();
- k_stream_load_put_result = TStreamLoadPutResult();
- k_stream_load_plan_status = Status::OK();
- k_response_str = "";
- config::streaming_load_max_mb = 1;
-
- _env._master_info = new TMasterInfo();
- _env._load_stream_mgr = new LoadStreamMgr();
- _env._internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
- _env._function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
- _env._stream_load_executor = new StreamLoadExecutor(&_env);
-
- _evhttp_req = evhttp_request_new(nullptr, nullptr);
- }
- void TearDown() override {
- delete _env._internal_client_cache;
- _env._internal_client_cache = nullptr;
- delete _env._function_client_cache;
- _env._function_client_cache = nullptr;
- delete _env._load_stream_mgr;
- _env._load_stream_mgr = nullptr;
- delete _env._master_info;
- _env._master_info = nullptr;
- delete _env._stream_load_executor;
- _env._stream_load_executor = nullptr;
-
- if (_evhttp_req != nullptr) {
- evhttp_request_free(_evhttp_req);
- }
- }
-
-private:
- ExecEnv _env;
- evhttp_request* _evhttp_req = nullptr;
-};
-
-TEST_F(StreamLoadActionTest, no_auth) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, normal) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
-
- struct evhttp_request ev_req;
- ev_req.remote_host = nullptr;
- request._ev_req = &ev_req;
-
- request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
- request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "0");
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Success", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, put_fail) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
-
- struct evhttp_request ev_req;
- ev_req.remote_host = nullptr;
- request._ev_req = &ev_req;
-
- request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
- request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
- Status status = Status::InternalError("TestFail");
- status.to_thrift(&k_stream_load_put_result.status);
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, commit_fail) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
- struct evhttp_request ev_req;
- ev_req.remote_host = nullptr;
- request._ev_req = &ev_req;
- request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
- request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
- Status status = Status::InternalError("TestFail");
- status.to_thrift(&k_stream_load_commit_result.status);
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, begin_fail) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
- struct evhttp_request ev_req;
- ev_req.remote_host = nullptr;
- request._ev_req = &ev_req;
- request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
- request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
- Status status = Status::InternalError("TestFail");
- status.to_thrift(&k_stream_load_begin_result.status);
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-#if 0
-TEST_F(StreamLoadActionTest, receive_failed) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
- request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
- request._headers.emplace(HttpHeaders::TRANSFER_ENCODING, "chunked");
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-#endif
-
-TEST_F(StreamLoadActionTest, plan_fail) {
- StreamLoadAction action(&_env);
-
- HttpRequest request(_evhttp_req);
- struct evhttp_request ev_req;
- ev_req.remote_host = nullptr;
- request._ev_req = &ev_req;
- request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
- request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
- k_stream_load_plan_status = Status::InternalError("TestFail");
- request.set_handler(&action);
- action.on_header(&request);
- action.handle(&request);
-
- rapidjson::Document doc;
- doc.Parse(k_response_str.c_str());
- EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-} // namespace doris
diff --git a/be/test/runtime/kafka_consumer_pipe_test.cpp
b/be/test/runtime/kafka_consumer_pipe_test.cpp
deleted file mode 100644
index ee10e42eee..0000000000
--- a/be/test/runtime/kafka_consumer_pipe_test.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/routine_load/kafka_consumer_pipe.h"
-
-#include <gtest/gtest.h>
-
-namespace doris {
-
-class KafkaConsumerPipeTest : public testing::Test {
-public:
- KafkaConsumerPipeTest() {}
- virtual ~KafkaConsumerPipeTest() {}
-
- void SetUp() override {}
-
- void TearDown() override {}
-
-private:
-};
-
-TEST_F(KafkaConsumerPipeTest, append_read) {
- KafkaConsumerPipe k_pipe(1024 * 1024, 64 * 1024);
-
- std::string msg1 = "i have a dream";
- std::string msg2 = "This is from kafka";
-
- Status st;
- st = k_pipe.append_with_line_delimiter(msg1.c_str(), msg1.length());
- EXPECT_TRUE(st.ok());
- st = k_pipe.append_with_line_delimiter(msg2.c_str(), msg2.length());
- EXPECT_TRUE(st.ok());
- st = k_pipe.finish();
- EXPECT_TRUE(st.ok());
-
- char buf[1024];
- int64_t data_size = 1024;
- int64_t read_bytes = 0;
- bool eof = false;
- st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(read_bytes, msg1.length() + msg2.length() + 2);
- EXPECT_EQ(eof, false);
-
- data_size = 1024;
- st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(read_bytes, 0);
- EXPECT_EQ(eof, true);
-}
-
-} // namespace doris
diff --git a/be/test/runtime/stream_load_pipe_test.cpp
b/be/test/runtime/stream_load_pipe_test.cpp
deleted file mode 100644
index 93c2cd91c2..0000000000
--- a/be/test/runtime/stream_load_pipe_test.cpp
+++ /dev/null
@@ -1,261 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/stream_load/stream_load_pipe.h"
-
-#include <gtest/gtest.h>
-
-#include <thread>
-
-namespace doris {
-
-class StreamLoadPipeTest : public testing::Test {
-public:
- StreamLoadPipeTest() {}
- virtual ~StreamLoadPipeTest() {}
- void SetUp() override {}
-};
-
-TEST_F(StreamLoadPipeTest, append_buffer) {
- StreamLoadPipe pipe(66, 64);
-
- auto appender = [&pipe] {
- int k = 0;
- for (int i = 0; i < 2; ++i) {
- auto byte_buf = ByteBuffer::allocate(64);
- char buf[64];
- for (int j = 0; j < 64; ++j) {
- buf[j] = '0' + (k++ % 10);
- }
- byte_buf->put_bytes(buf, 64);
- byte_buf->flip();
- pipe.append(byte_buf);
- }
- pipe.finish();
- };
- std::thread t1(appender);
-
- char buf[256];
- int64_t buf_len = 256;
- int64_t read_bytes = 0;
- bool eof = false;
- auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(128, read_bytes);
- EXPECT_FALSE(eof);
- for (int i = 0; i < 128; ++i) {
- EXPECT_EQ('0' + (i % 10), buf[i]);
- }
- st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(0, read_bytes);
- EXPECT_TRUE(eof);
-
- t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, append_bytes) {
- StreamLoadPipe pipe(66, 64);
-
- auto appender = [&pipe] {
- for (int i = 0; i < 128; ++i) {
- char buf = '0' + (i % 10);
- pipe.append(&buf, 1);
- }
- pipe.finish();
- };
- std::thread t1(appender);
-
- char buf[256];
- int64_t buf_len = 256;
- int64_t read_bytes = 0;
- bool eof = false;
- auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(128, read_bytes);
- EXPECT_FALSE(eof);
- for (int i = 0; i < 128; ++i) {
- EXPECT_EQ('0' + (i % 10), buf[i]);
- }
- st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(0, read_bytes);
- EXPECT_TRUE(eof);
-
- t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, append_bytes2) {
- StreamLoadPipe pipe(66, 64);
-
- auto appender = [&pipe] {
- for (int i = 0; i < 128; ++i) {
- char buf = '0' + (i % 10);
- pipe.append(&buf, 1);
- }
- pipe.finish();
- };
- std::thread t1(appender);
-
- char buf[128];
- int64_t buf_len = 62;
- int64_t read_bytes = 0;
- bool eof = false;
- auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(62, read_bytes);
- EXPECT_FALSE(eof);
- for (int i = 0; i < 62; ++i) {
- EXPECT_EQ('0' + (i % 10), buf[i]);
- }
- for (int i = 62; i < 128; ++i) {
- char ch;
- buf_len = 1;
- auto st = pipe.read((uint8_t*)&ch, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(1, read_bytes);
- EXPECT_FALSE(eof);
- EXPECT_EQ('0' + (i % 10), ch);
- }
- st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(0, read_bytes);
- EXPECT_TRUE(eof);
-
- t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, append_mix) {
- StreamLoadPipe pipe(66, 64);
-
- auto appender = [&pipe] {
- // 10
- int k = 0;
- for (int i = 0; i < 10; ++i) {
- char buf = '0' + (k++ % 10);
- pipe.append(&buf, 1);
- }
- // 60
- {
- auto byte_buf = ByteBuffer::allocate(60);
- char buf[60];
- for (int j = 0; j < 60; ++j) {
- buf[j] = '0' + (k++ % 10);
- }
- byte_buf->put_bytes(buf, 60);
- byte_buf->flip();
- pipe.append(byte_buf);
- }
- // 8
- for (int i = 0; i < 8; ++i) {
- char buf = '0' + (k++ % 10);
- pipe.append(&buf, 1);
- }
- // 50
- {
- auto byte_buf = ByteBuffer::allocate(50);
- char buf[50];
- for (int j = 0; j < 50; ++j) {
- buf[j] = '0' + (k++ % 10);
- }
- byte_buf->put_bytes(buf, 50);
- byte_buf->flip();
- pipe.append(byte_buf);
- }
- pipe.finish();
- };
- std::thread t1(appender);
-
- char buf[128];
- int64_t buf_len = 128;
- int64_t read_bytes = 0;
- bool eof = false;
- auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(128, read_bytes);
- EXPECT_FALSE(eof);
- for (int i = 0; i < 128; ++i) {
- EXPECT_EQ('0' + (i % 10), buf[i]);
- }
- st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(0, read_bytes);
- EXPECT_TRUE(eof);
-
- t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, cancel) {
- StreamLoadPipe pipe(66, 64);
-
- auto appender = [&pipe] {
- int k = 0;
- for (int i = 0; i < 10; ++i) {
- char buf = '0' + (k++ % 10);
- pipe.append(&buf, 1);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- pipe.cancel("test");
- };
- std::thread t1(appender);
-
- char buf[128];
- int64_t buf_len = 128;
- int64_t read_bytes = 0;
- bool eof = false;
- auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
- EXPECT_FALSE(st.ok());
- t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, close) {
- StreamLoadPipe pipe(66, 64);
-
- auto appender = [&pipe] {
- int k = 0;
- {
- auto byte_buf = ByteBuffer::allocate(64);
- char buf[64];
- for (int j = 0; j < 64; ++j) {
- buf[j] = '0' + (k++ % 10);
- }
- byte_buf->put_bytes(buf, 64);
- byte_buf->flip();
- pipe.append(byte_buf);
- }
- {
- auto byte_buf = ByteBuffer::allocate(64);
- char buf[64];
- for (int j = 0; j < 64; ++j) {
- buf[j] = '0' + (k++ % 10);
- }
- byte_buf->put_bytes(buf, 64);
- byte_buf->flip();
- auto st = pipe.append(byte_buf);
- EXPECT_FALSE(st.ok());
- }
- };
- std::thread t1(appender);
-
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
-
- pipe.close();
-
- t1.join();
-}
-
-} // namespace doris
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index 05fca396dc..1182e1cb8e 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -30,7 +30,6 @@
#include "runtime/exec_env.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/runtime_state.h"
-#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/types.h"
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
@@ -344,7 +343,6 @@ public:
k_add_batch_status = Status::OK();
_env = ExecEnv::GetInstance();
_env->_master_info = new TMasterInfo();
- _env->_load_stream_mgr = new LoadStreamMgr();
_env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
ThreadPoolBuilder("SendBatchThreadPool")
@@ -359,7 +357,6 @@ public:
void TearDown() override {
SAFE_DELETE(_env->_internal_client_cache);
SAFE_DELETE(_env->_function_client_cache);
- SAFE_DELETE(_env->_load_stream_mgr);
SAFE_DELETE(_env->_master_info);
if (_server) {
_server->Stop(100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]