This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fcd6cd236 [refactor](remove unused code) remove load stream mgr 
(#16580)
4fcd6cd236 is described below

commit 4fcd6cd2365b038a5ef3830003d9ca28af960237
Author: yiguolei <[email protected]>
AuthorDate: Fri Feb 10 07:46:18 2023 +0800

    [refactor](remove unused code) remove load stream mgr (#16580)
    
    remove old stream load pipe
    remove old stream load manager
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/http/action/stream_load.cpp                |   2 +-
 be/src/io/file_factory.cpp                        |  10 -
 be/src/io/file_factory.h                          |   4 -
 be/src/runtime/CMakeLists.txt                     |   1 -
 be/src/runtime/exec_env.h                         |   3 -
 be/src/runtime/exec_env_init.cpp                  |   3 -
 be/src/runtime/fragment_mgr.cpp                   |   2 +-
 be/src/runtime/routine_load/kafka_consumer_pipe.h |  54 -----
 be/src/runtime/stream_load/load_stream_mgr.cpp    |  36 ---
 be/src/runtime/stream_load/load_stream_mgr.h      |  72 ------
 be/src/runtime/stream_load/new_load_stream_mgr.h  |   1 -
 be/src/runtime/stream_load/stream_load_context.h  |   1 -
 be/src/runtime/stream_load/stream_load_pipe.h     | 282 ----------------------
 be/test/CMakeLists.txt                            |   3 -
 be/test/http/stream_load_test.cpp                 | 234 ------------------
 be/test/runtime/kafka_consumer_pipe_test.cpp      |  66 -----
 be/test/runtime/stream_load_pipe_test.cpp         | 261 --------------------
 be/test/vec/exec/vtablet_sink_test.cpp            |   3 -
 18 files changed, 2 insertions(+), 1036 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 8281064881..a55c0e22c2 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -397,7 +397,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
     request.__set_loadId(ctx->id.to_thrift());
     if (ctx->use_streaming) {
         auto pipe = std::make_shared<io::StreamLoadPipe>(
-                kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* 
min_chunk_size */,
+                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
                 ctx->body_bytes /* total_length */);
         RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));
 
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 42b14f3572..9324a83f27 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -37,7 +37,6 @@
 #include "io/s3_writer.h"
 #include "olap/iterators.h"
 #include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 
 namespace doris {
@@ -198,15 +197,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& 
load_id, io::FileReaderS
     return Status::OK();
 }
 
-Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
-                                       std::shared_ptr<FileReader>& 
file_reader) {
-    file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id);
-    if (!file_reader) {
-        return Status::InternalError("unknown stream load id: {}", 
UniqueId(load_id).to_string());
-    }
-    return Status::OK();
-}
-
 Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
                                        std::shared_ptr<io::FileSystem>* 
hdfs_file_system,
                                        io::FileReaderSPtr* reader,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 63ab0f2a83..c78504f9c4 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -80,10 +80,6 @@ public:
     // Create FileReader for stream load pipe
     static Status create_pipe_reader(const TUniqueId& load_id, 
io::FileReaderSPtr* file_reader);
 
-    // [deprecated] Create FileReader for stream load pipe
-    static Status create_pipe_reader(const TUniqueId& load_id,
-                                     std::shared_ptr<FileReader>& file_reader);
-
     static Status create_hdfs_reader(const THdfsParams& hdfs_params, const 
std::string& path,
                                      std::shared_ptr<io::FileSystem>* 
hdfs_file_system,
                                      io::FileReaderSPtr* reader,
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 94b95a6fe6..a768c16a55 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -59,7 +59,6 @@ set(RUNTIME_FILES
     stream_load/stream_load_context.cpp
     stream_load/stream_load_executor.cpp
     stream_load/stream_load_recorder.cpp
-    stream_load/load_stream_mgr.cpp
     stream_load/new_load_stream_mgr.cpp
     routine_load/data_consumer.cpp
     routine_load/data_consumer_group.cpp
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index b646f67589..4d2163cf39 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -47,7 +47,6 @@ class ExternalScanContextMgr;
 class FragmentMgr;
 class ResultCache;
 class LoadPathMgr;
-class LoadStreamMgr;
 class NewLoadStreamMgr;
 class MemTrackerLimiter;
 class MemTracker;
@@ -156,7 +155,6 @@ public:
         return _function_client_cache;
     }
     LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
-    LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
     NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr; }
     SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
     BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
@@ -230,7 +228,6 @@ private:
     BfdParser* _bfd_parser = nullptr;
     BrokerMgr* _broker_mgr = nullptr;
     LoadChannelMgr* _load_channel_mgr = nullptr;
-    LoadStreamMgr* _load_stream_mgr = nullptr;
     NewLoadStreamMgr* _new_load_stream_mgr = nullptr;
     BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
     BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 62330854bb..8b3146cdc2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -42,7 +42,6 @@
 #include "runtime/result_queue_mgr.h"
 #include "runtime/routine_load/routine_load_task_executor.h"
 #include "runtime/small_file_mgr.h"
-#include "runtime/stream_load/load_stream_mgr.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
 #include "runtime/tmp_file_mgr.h"
@@ -116,7 +115,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     _bfd_parser = BfdParser::create();
     _broker_mgr = new BrokerMgr(this);
     _load_channel_mgr = new LoadChannelMgr();
-    _load_stream_mgr = new LoadStreamMgr();
     _new_load_stream_mgr = new NewLoadStreamMgr();
     _internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
     _function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
@@ -313,7 +311,6 @@ void ExecEnv::_destroy() {
     _deregister_metrics();
     SAFE_DELETE(_internal_client_cache);
     SAFE_DELETE(_function_client_cache);
-    SAFE_DELETE(_load_stream_mgr);
     SAFE_DELETE(_load_channel_mgr);
     SAFE_DELETE(_broker_mgr);
     SAFE_DELETE(_bfd_parser);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e1bd3651db..d705ff1141 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -506,7 +506,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
         stream_load_ctx->need_commit_self = true;
         stream_load_ctx->need_rollback = true;
         auto pipe = std::make_shared<io::StreamLoadPipe>(
-                kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* 
min_chunk_size */,
+                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 
/* min_chunk_size */,
                 -1 /* total_length */, true /* use_proto */);
         stream_load_ctx->body_sink = pipe;
         stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h 
b/be/src/runtime/routine_load/kafka_consumer_pipe.h
deleted file mode 100644
index 6d01bbe091..0000000000
--- a/be/src/runtime/routine_load/kafka_consumer_pipe.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <map>
-#include <string>
-#include <vector>
-
-#include "io/file_reader.h"
-#include "librdkafka/rdkafka.h"
-#include "runtime/message_body_sink.h"
-#include "runtime/stream_load/stream_load_pipe.h"
-
-namespace doris {
-
-class KafkaConsumerPipe : public StreamLoadPipe {
-public:
-    KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t 
min_chunk_size = 64 * 1024)
-            : StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}
-
-    virtual ~KafkaConsumerPipe() {}
-
-    Status append_with_line_delimiter(const char* data, size_t size) {
-        Status st = append(data, size);
-        if (!st.ok()) {
-            return st;
-        }
-
-        // append the line delimiter
-        st = append("\n", 1);
-        return st;
-    }
-
-    Status append_json(const char* data, size_t size) { return 
append_and_flush(data, size); }
-};
-
-} // end namespace doris
diff --git a/be/src/runtime/stream_load/load_stream_mgr.cpp 
b/be/src/runtime/stream_load/load_stream_mgr.cpp
deleted file mode 100644
index c486dccd8d..0000000000
--- a/be/src/runtime/stream_load/load_stream_mgr.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/stream_load/load_stream_mgr.h"
-
-namespace doris {
-
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(stream_load_pipe_count, MetricUnit::NOUNIT);
-
-LoadStreamMgr::LoadStreamMgr() {
-    // Each StreamLoadPipe has a limited buffer size (default 1M), it's not 
needed to count the
-    // actual size of all StreamLoadPipe.
-    REGISTER_HOOK_METRIC(stream_load_pipe_count, [this]() {
-        // std::lock_guard<std::mutex> l(_lock);
-        return _stream_map.size();
-    });
-}
-
-LoadStreamMgr::~LoadStreamMgr() {
-    DEREGISTER_HOOK_METRIC(stream_load_pipe_count);
-}
-} // namespace doris
diff --git a/be/src/runtime/stream_load/load_stream_mgr.h 
b/be/src/runtime/stream_load/load_stream_mgr.h
deleted file mode 100644
index b374fe0c32..0000000000
--- a/be/src/runtime/stream_load/load_stream_mgr.h
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-
-#include "runtime/stream_load/stream_load_pipe.h" // for StreamLoadPipe
-#include "util/doris_metrics.h"
-#include "util/uid_util.h" // for std::hash for UniqueId
-
-namespace doris {
-
-// used to register all streams in process so that other module can get this 
stream
-class LoadStreamMgr {
-public:
-    LoadStreamMgr();
-    ~LoadStreamMgr();
-
-    Status put(const UniqueId& id, std::shared_ptr<StreamLoadPipe> stream) {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _stream_map.find(id);
-        if (it != std::end(_stream_map)) {
-            return Status::InternalError("id already exist");
-        }
-        _stream_map.emplace(id, stream);
-        VLOG_NOTICE << "put stream load pipe: " << id;
-        return Status::OK();
-    }
-
-    std::shared_ptr<StreamLoadPipe> get(const UniqueId& id) {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _stream_map.find(id);
-        if (it == std::end(_stream_map)) {
-            return nullptr;
-        }
-        auto stream = it->second;
-        _stream_map.erase(it);
-        return stream;
-    }
-
-    void remove(const UniqueId& id) {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _stream_map.find(id);
-        if (it != std::end(_stream_map)) {
-            _stream_map.erase(it);
-            VLOG_NOTICE << "remove stream load pipe: " << id;
-        }
-    }
-
-private:
-    std::mutex _lock;
-    std::unordered_map<UniqueId, std::shared_ptr<StreamLoadPipe>> _stream_map;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h 
b/be/src/runtime/stream_load/new_load_stream_mgr.h
index 60cd3aa446..9ab2030487 100644
--- a/be/src/runtime/stream_load/new_load_stream_mgr.h
+++ b/be/src/runtime/stream_load/new_load_stream_mgr.h
@@ -28,7 +28,6 @@
 namespace doris {
 
 // used to register all streams in process so that other module can get this 
stream
-// TODO(ftw): should be renamed to `LoadStreamMgr` after new file reader is 
ready.
 class NewLoadStreamMgr {
 public:
     NewLoadStreamMgr();
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 0fc27a27f9..ef3602ab8f 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -28,7 +28,6 @@
 #include "gen_cpp/BackendService_types.h"
 #include "gen_cpp/FrontendService_types.h"
 #include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
 #include "service/backend_options.h"
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h 
b/be/src/runtime/stream_load/stream_load_pipe.h
deleted file mode 100644
index f32048da54..0000000000
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ /dev/null
@@ -1,282 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <condition_variable>
-#include <deque>
-#include <mutex>
-
-#include "gen_cpp/internal_service.pb.h"
-#include "io/file_reader.h"
-#include "runtime/message_body_sink.h"
-#include "runtime/thread_context.h"
-#include "util/bit_util.h"
-#include "util/byte_buffer.h"
-
-namespace doris {
-
-const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
-// StreamLoadPipe use to transfer data from producer to consumer
-// Data in pip is stored in chunks.
-
-class StreamLoadPipe : public MessageBodySink, public FileReader {
-public:
-    StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
-                   size_t min_chunk_size = 64 * 1024, int64_t total_length = 
-1,
-                   bool use_proto = false)
-            : _buffered_bytes(0),
-              _proto_buffered_bytes(0),
-              _max_buffered_bytes(max_buffered_bytes),
-              _min_chunk_size(min_chunk_size),
-              _total_length(total_length),
-              _use_proto(use_proto) {}
-
-    ~StreamLoadPipe() override {
-        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
-        while (!_buf_queue.empty()) {
-            _buf_queue.pop_front();
-        }
-    }
-
-    Status open() override { return Status::OK(); }
-
-    Status append_and_flush(const char* data, size_t size, size_t 
proto_byte_size = 0) {
-        ByteBufferPtr buf = 
ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
-        buf->put_bytes(data, size);
-        buf->flip();
-        return _append(buf, proto_byte_size);
-    }
-
-    Status append(const char* data, size_t size) override {
-        size_t pos = 0;
-        if (_write_buf != nullptr) {
-            if (size < _write_buf->remaining()) {
-                _write_buf->put_bytes(data, size);
-                return Status::OK();
-            } else {
-                pos = _write_buf->remaining();
-                _write_buf->put_bytes(data, pos);
-
-                _write_buf->flip();
-                RETURN_IF_ERROR(_append(_write_buf));
-                _write_buf.reset();
-            }
-        }
-        // need to allocate a new chunk, min chunk is 64k
-        size_t chunk_size = std::max(_min_chunk_size, size - pos);
-        chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
-        _write_buf = ByteBuffer::allocate(chunk_size);
-        _write_buf->put_bytes(data + pos, size - pos);
-        return Status::OK();
-    }
-
-    Status append(const ByteBufferPtr& buf) override {
-        if (_write_buf != nullptr) {
-            _write_buf->flip();
-            RETURN_IF_ERROR(_append(_write_buf));
-            _write_buf.reset();
-        }
-        return _append(buf);
-    }
-
-    // If _total_length == -1, this should be a Kafka routine load task,
-    // just get the next buffer directly from the buffer queue, because one 
buffer contains a complete piece of data.
-    // Otherwise, this should be a stream load task that needs to read the 
specified amount of data.
-    Status read_one_message(std::unique_ptr<uint8_t[]>* data, int64_t* length) 
override {
-        if (_total_length < -1) {
-            return Status::InternalError("invalid, _total_length is: {}", 
_total_length);
-        } else if (_total_length == 0) {
-            // no data
-            *length = 0;
-            return Status::OK();
-        }
-
-        if (_total_length == -1) {
-            return _read_next_buffer(data, length);
-        }
-
-        // _total_length > 0, read the entire data
-        data->reset(new uint8_t[_total_length]);
-        bool eof = false;
-        Status st = read(data->get(), _total_length, length, &eof);
-        if (eof) {
-            *length = 0;
-        }
-        return st;
-    }
-
-    Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* 
eof) override {
-        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
-        *bytes_read = 0;
-        while (*bytes_read < data_size) {
-            std::unique_lock<std::mutex> l(_lock);
-            while (!_cancelled && !_finished && _buf_queue.empty()) {
-                _get_cond.wait(l);
-            }
-            // cancelled
-            if (_cancelled) {
-                return Status::InternalError("cancelled: {}", 
_cancelled_reason);
-            }
-            // finished
-            if (_buf_queue.empty()) {
-                DCHECK(_finished);
-                data_size = *bytes_read;
-                *eof = (*bytes_read == 0);
-                return Status::OK();
-            }
-            auto buf = _buf_queue.front();
-            int64_t copy_size = std::min(data_size - *bytes_read, 
(int64_t)buf->remaining());
-            buf->get_bytes((char*)data + *bytes_read, copy_size);
-            *bytes_read += copy_size;
-            if (!buf->has_remaining()) {
-                _buf_queue.pop_front();
-                _buffered_bytes -= buf->limit;
-                _put_cond.notify_one();
-            }
-        }
-        DCHECK(*bytes_read == data_size)
-                << "*bytes_read=" << *bytes_read << ", data_size=" << 
data_size;
-        *eof = false;
-        return Status::OK();
-    }
-
-    Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* 
out) override {
-        return Status::InternalError("Not implemented");
-    }
-
-    int64_t size() override { return 0; }
-
-    Status seek(int64_t position) override { return Status::InternalError("Not 
implemented"); }
-
-    Status tell(int64_t* position) override { return 
Status::InternalError("Not implemented"); }
-
-    // called when consumer finished
-    void close() override { cancel("closed"); }
-
-    bool closed() override { return _cancelled; }
-
-    // called when producer finished
-    Status finish() override {
-        if (_write_buf != nullptr) {
-            _write_buf->flip();
-            _append(_write_buf);
-            _write_buf.reset();
-        }
-        {
-            std::lock_guard<std::mutex> l(_lock);
-            _finished = true;
-        }
-        _get_cond.notify_all();
-        return Status::OK();
-    }
-
-    // called when producer/consumer failed
-    void cancel(const std::string& reason) override {
-        {
-            std::lock_guard<std::mutex> l(_lock);
-            _cancelled = true;
-            _cancelled_reason = reason;
-        }
-        _get_cond.notify_all();
-        _put_cond.notify_all();
-    }
-
-private:
-    // read the next buffer from _buf_queue
-    Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t* 
length) {
-        std::unique_lock<std::mutex> l(_lock);
-        while (!_cancelled && !_finished && _buf_queue.empty()) {
-            _get_cond.wait(l);
-        }
-        // cancelled
-        if (_cancelled) {
-            return Status::InternalError("cancelled: {}", _cancelled_reason);
-        }
-        // finished
-        if (_buf_queue.empty()) {
-            DCHECK(_finished);
-            data->reset();
-            *length = 0;
-            return Status::OK();
-        }
-        auto buf = _buf_queue.front();
-        *length = buf->remaining();
-        data->reset(new uint8_t[*length]);
-        buf->get_bytes((char*)(data->get()), *length);
-        _buf_queue.pop_front();
-        _buffered_bytes -= buf->limit;
-        if (_use_proto) {
-            PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
-            _proto_buffered_bytes -= (sizeof(PDataRow*) + 
(*ptr)->GetCachedSize());
-        }
-        _put_cond.notify_one();
-        return Status::OK();
-    }
-
-    Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) {
-        {
-            std::unique_lock<std::mutex> l(_lock);
-            // if _buf_queue is empty, we append this buf without size check
-            if (_use_proto) {
-                while (!_cancelled && !_buf_queue.empty() &&
-                       (_proto_buffered_bytes + proto_byte_size > 
_max_buffered_bytes)) {
-                    _put_cond.wait(l);
-                }
-            } else {
-                while (!_cancelled && !_buf_queue.empty() &&
-                       _buffered_bytes + buf->remaining() > 
_max_buffered_bytes) {
-                    _put_cond.wait(l);
-                }
-            }
-            if (_cancelled) {
-                return Status::InternalError("cancelled: {}", 
_cancelled_reason);
-            }
-            _buf_queue.push_back(buf);
-            if (_use_proto) {
-                _proto_buffered_bytes += proto_byte_size;
-            } else {
-                _buffered_bytes += buf->remaining();
-            }
-        }
-        _get_cond.notify_one();
-        return Status::OK();
-    }
-
-    // Blocking queue
-    std::mutex _lock;
-    size_t _buffered_bytes;
-    size_t _proto_buffered_bytes;
-    size_t _max_buffered_bytes;
-    size_t _min_chunk_size;
-    // The total amount of data expected to be read.
-    // In some scenarios, such as loading json format data through stream load,
-    // the data needs to be completely read before it can be parsed,
-    // so the total size of the data needs to be known.
-    // The default is -1, which means that the data arrives in a stream
-    // and the length is unknown.
-    // size_t is unsigned, so use int64_t
-    int64_t _total_length = -1;
-    bool _use_proto = false;
-    std::deque<ByteBufferPtr> _buf_queue;
-    std::condition_variable _put_cond;
-    std::condition_variable _get_cond;
-
-    ByteBufferPtr _write_buf;
-};
-
-} // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 444416bce3..7fa7493bf6 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -66,7 +66,6 @@ set(HTTP_TEST_FILES
     http/http_utils_test.cpp
     http/http_client_test.cpp
     # TODO this will overide HttpChannel and make other test failed
-    # http/stream_load_test.cpp
     # http/metrics_action_test.cpp
 )
 set(IO_TEST_FILES
@@ -145,10 +144,8 @@ set(RUNTIME_TEST_FILES
     runtime/string_value_test.cpp
     runtime/fragment_mgr_test.cpp
     runtime/mem_limit_test.cpp
-    runtime/stream_load_pipe_test.cpp
     runtime/snapshot_loader_test.cpp
     runtime/user_function_cache_test.cpp
-    runtime/kafka_consumer_pipe_test.cpp
     runtime/routine_load_task_executor_test.cpp
     runtime/small_file_mgr_test.cpp
     runtime/heartbeat_flags_test.cpp
diff --git a/be/test/http/stream_load_test.cpp 
b/be/test/http/stream_load_test.cpp
deleted file mode 100644
index 1609eedf80..0000000000
--- a/be/test/http/stream_load_test.cpp
+++ /dev/null
@@ -1,234 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "http/action/stream_load.h"
-
-#include <event2/http.h>
-#include <event2/http_struct.h>
-#include <gtest/gtest.h>
-#include <rapidjson/document.h>
-
-#include "exec/schema_scanner/schema_helper.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "http/http_channel.h"
-#include "http/http_request.h"
-#include "runtime/exec_env.h"
-#include "runtime/stream_load/load_stream_mgr.h"
-#include "runtime/stream_load/stream_load_executor.h"
-#include "util/brpc_client_cache.h"
-#include "util/cpu_info.h"
-
-struct mg_connection;
-
-namespace doris {
-
-std::string k_response_str;
-
-// Send Unauthorized status with basic challenge
-void HttpChannel::send_basic_challenge(HttpRequest* req, const std::string& 
realm) {}
-
-void HttpChannel::send_error(HttpRequest* request, HttpStatus status) {}
-
-void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) {}
-
-void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const 
std::string& content) {
-    k_response_str = content;
-}
-
-void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t 
size) {}
-
-extern TLoadTxnBeginResult k_stream_load_begin_result;
-extern TLoadTxnCommitResult k_stream_load_commit_result;
-extern TLoadTxnRollbackResult k_stream_load_rollback_result;
-extern TStreamLoadPutResult k_stream_load_put_result;
-extern Status k_stream_load_plan_status;
-
-class StreamLoadActionTest : public testing::Test {
-public:
-    StreamLoadActionTest() {}
-    virtual ~StreamLoadActionTest() {}
-    void SetUp() override {
-        k_stream_load_begin_result = TLoadTxnBeginResult();
-        k_stream_load_commit_result = TLoadTxnCommitResult();
-        k_stream_load_rollback_result = TLoadTxnRollbackResult();
-        k_stream_load_put_result = TStreamLoadPutResult();
-        k_stream_load_plan_status = Status::OK();
-        k_response_str = "";
-        config::streaming_load_max_mb = 1;
-
-        _env._master_info = new TMasterInfo();
-        _env._load_stream_mgr = new LoadStreamMgr();
-        _env._internal_client_cache = new 
BrpcClientCache<PBackendService_Stub>();
-        _env._function_client_cache = new 
BrpcClientCache<PFunctionService_Stub>();
-        _env._stream_load_executor = new StreamLoadExecutor(&_env);
-
-        _evhttp_req = evhttp_request_new(nullptr, nullptr);
-    }
-    void TearDown() override {
-        delete _env._internal_client_cache;
-        _env._internal_client_cache = nullptr;
-        delete _env._function_client_cache;
-        _env._function_client_cache = nullptr;
-        delete _env._load_stream_mgr;
-        _env._load_stream_mgr = nullptr;
-        delete _env._master_info;
-        _env._master_info = nullptr;
-        delete _env._stream_load_executor;
-        _env._stream_load_executor = nullptr;
-
-        if (_evhttp_req != nullptr) {
-            evhttp_request_free(_evhttp_req);
-        }
-    }
-
-private:
-    ExecEnv _env;
-    evhttp_request* _evhttp_req = nullptr;
-};
-
-TEST_F(StreamLoadActionTest, no_auth) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, normal) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-
-    struct evhttp_request ev_req;
-    ev_req.remote_host = nullptr;
-    request._ev_req = &ev_req;
-
-    request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
-    request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "0");
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Success", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, put_fail) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-
-    struct evhttp_request ev_req;
-    ev_req.remote_host = nullptr;
-    request._ev_req = &ev_req;
-
-    request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
-    request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
-    Status status = Status::InternalError("TestFail");
-    status.to_thrift(&k_stream_load_put_result.status);
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, commit_fail) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-    struct evhttp_request ev_req;
-    ev_req.remote_host = nullptr;
-    request._ev_req = &ev_req;
-    request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
-    request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
-    Status status = Status::InternalError("TestFail");
-    status.to_thrift(&k_stream_load_commit_result.status);
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-TEST_F(StreamLoadActionTest, begin_fail) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-    struct evhttp_request ev_req;
-    ev_req.remote_host = nullptr;
-    request._ev_req = &ev_req;
-    request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
-    request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
-    Status status = Status::InternalError("TestFail");
-    status.to_thrift(&k_stream_load_begin_result.status);
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-#if 0
-TEST_F(StreamLoadActionTest, receive_failed) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-    request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
-    request._headers.emplace(HttpHeaders::TRANSFER_ENCODING, "chunked");
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-#endif
-
-TEST_F(StreamLoadActionTest, plan_fail) {
-    StreamLoadAction action(&_env);
-
-    HttpRequest request(_evhttp_req);
-    struct evhttp_request ev_req;
-    ev_req.remote_host = nullptr;
-    request._ev_req = &ev_req;
-    request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
-    request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
-    k_stream_load_plan_status = Status::InternalError("TestFail");
-    request.set_handler(&action);
-    action.on_header(&request);
-    action.handle(&request);
-
-    rapidjson::Document doc;
-    doc.Parse(k_response_str.c_str());
-    EXPECT_STREQ("Fail", doc["Status"].GetString());
-}
-
-} // namespace doris
diff --git a/be/test/runtime/kafka_consumer_pipe_test.cpp 
b/be/test/runtime/kafka_consumer_pipe_test.cpp
deleted file mode 100644
index ee10e42eee..0000000000
--- a/be/test/runtime/kafka_consumer_pipe_test.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/routine_load/kafka_consumer_pipe.h"
-
-#include <gtest/gtest.h>
-
-namespace doris {
-
-class KafkaConsumerPipeTest : public testing::Test {
-public:
-    KafkaConsumerPipeTest() {}
-    virtual ~KafkaConsumerPipeTest() {}
-
-    void SetUp() override {}
-
-    void TearDown() override {}
-
-private:
-};
-
-TEST_F(KafkaConsumerPipeTest, append_read) {
-    KafkaConsumerPipe k_pipe(1024 * 1024, 64 * 1024);
-
-    std::string msg1 = "i have a dream";
-    std::string msg2 = "This is from kafka";
-
-    Status st;
-    st = k_pipe.append_with_line_delimiter(msg1.c_str(), msg1.length());
-    EXPECT_TRUE(st.ok());
-    st = k_pipe.append_with_line_delimiter(msg2.c_str(), msg2.length());
-    EXPECT_TRUE(st.ok());
-    st = k_pipe.finish();
-    EXPECT_TRUE(st.ok());
-
-    char buf[1024];
-    int64_t data_size = 1024;
-    int64_t read_bytes = 0;
-    bool eof = false;
-    st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(read_bytes, msg1.length() + msg2.length() + 2);
-    EXPECT_EQ(eof, false);
-
-    data_size = 1024;
-    st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(read_bytes, 0);
-    EXPECT_EQ(eof, true);
-}
-
-} // namespace doris
diff --git a/be/test/runtime/stream_load_pipe_test.cpp 
b/be/test/runtime/stream_load_pipe_test.cpp
deleted file mode 100644
index 93c2cd91c2..0000000000
--- a/be/test/runtime/stream_load_pipe_test.cpp
+++ /dev/null
@@ -1,261 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/stream_load/stream_load_pipe.h"
-
-#include <gtest/gtest.h>
-
-#include <thread>
-
-namespace doris {
-
-class StreamLoadPipeTest : public testing::Test {
-public:
-    StreamLoadPipeTest() {}
-    virtual ~StreamLoadPipeTest() {}
-    void SetUp() override {}
-};
-
-TEST_F(StreamLoadPipeTest, append_buffer) {
-    StreamLoadPipe pipe(66, 64);
-
-    auto appender = [&pipe] {
-        int k = 0;
-        for (int i = 0; i < 2; ++i) {
-            auto byte_buf = ByteBuffer::allocate(64);
-            char buf[64];
-            for (int j = 0; j < 64; ++j) {
-                buf[j] = '0' + (k++ % 10);
-            }
-            byte_buf->put_bytes(buf, 64);
-            byte_buf->flip();
-            pipe.append(byte_buf);
-        }
-        pipe.finish();
-    };
-    std::thread t1(appender);
-
-    char buf[256];
-    int64_t buf_len = 256;
-    int64_t read_bytes = 0;
-    bool eof = false;
-    auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(128, read_bytes);
-    EXPECT_FALSE(eof);
-    for (int i = 0; i < 128; ++i) {
-        EXPECT_EQ('0' + (i % 10), buf[i]);
-    }
-    st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(0, read_bytes);
-    EXPECT_TRUE(eof);
-
-    t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, append_bytes) {
-    StreamLoadPipe pipe(66, 64);
-
-    auto appender = [&pipe] {
-        for (int i = 0; i < 128; ++i) {
-            char buf = '0' + (i % 10);
-            pipe.append(&buf, 1);
-        }
-        pipe.finish();
-    };
-    std::thread t1(appender);
-
-    char buf[256];
-    int64_t buf_len = 256;
-    int64_t read_bytes = 0;
-    bool eof = false;
-    auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(128, read_bytes);
-    EXPECT_FALSE(eof);
-    for (int i = 0; i < 128; ++i) {
-        EXPECT_EQ('0' + (i % 10), buf[i]);
-    }
-    st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(0, read_bytes);
-    EXPECT_TRUE(eof);
-
-    t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, append_bytes2) {
-    StreamLoadPipe pipe(66, 64);
-
-    auto appender = [&pipe] {
-        for (int i = 0; i < 128; ++i) {
-            char buf = '0' + (i % 10);
-            pipe.append(&buf, 1);
-        }
-        pipe.finish();
-    };
-    std::thread t1(appender);
-
-    char buf[128];
-    int64_t buf_len = 62;
-    int64_t read_bytes = 0;
-    bool eof = false;
-    auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(62, read_bytes);
-    EXPECT_FALSE(eof);
-    for (int i = 0; i < 62; ++i) {
-        EXPECT_EQ('0' + (i % 10), buf[i]);
-    }
-    for (int i = 62; i < 128; ++i) {
-        char ch;
-        buf_len = 1;
-        auto st = pipe.read((uint8_t*)&ch, buf_len, &read_bytes, &eof);
-        EXPECT_TRUE(st.ok());
-        EXPECT_EQ(1, read_bytes);
-        EXPECT_FALSE(eof);
-        EXPECT_EQ('0' + (i % 10), ch);
-    }
-    st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(0, read_bytes);
-    EXPECT_TRUE(eof);
-
-    t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, append_mix) {
-    StreamLoadPipe pipe(66, 64);
-
-    auto appender = [&pipe] {
-        // 10
-        int k = 0;
-        for (int i = 0; i < 10; ++i) {
-            char buf = '0' + (k++ % 10);
-            pipe.append(&buf, 1);
-        }
-        // 60
-        {
-            auto byte_buf = ByteBuffer::allocate(60);
-            char buf[60];
-            for (int j = 0; j < 60; ++j) {
-                buf[j] = '0' + (k++ % 10);
-            }
-            byte_buf->put_bytes(buf, 60);
-            byte_buf->flip();
-            pipe.append(byte_buf);
-        }
-        // 8
-        for (int i = 0; i < 8; ++i) {
-            char buf = '0' + (k++ % 10);
-            pipe.append(&buf, 1);
-        }
-        // 50
-        {
-            auto byte_buf = ByteBuffer::allocate(50);
-            char buf[50];
-            for (int j = 0; j < 50; ++j) {
-                buf[j] = '0' + (k++ % 10);
-            }
-            byte_buf->put_bytes(buf, 50);
-            byte_buf->flip();
-            pipe.append(byte_buf);
-        }
-        pipe.finish();
-    };
-    std::thread t1(appender);
-
-    char buf[128];
-    int64_t buf_len = 128;
-    int64_t read_bytes = 0;
-    bool eof = false;
-    auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(128, read_bytes);
-    EXPECT_FALSE(eof);
-    for (int i = 0; i < 128; ++i) {
-        EXPECT_EQ('0' + (i % 10), buf[i]);
-    }
-    st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_TRUE(st.ok());
-    EXPECT_EQ(0, read_bytes);
-    EXPECT_TRUE(eof);
-
-    t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, cancel) {
-    StreamLoadPipe pipe(66, 64);
-
-    auto appender = [&pipe] {
-        int k = 0;
-        for (int i = 0; i < 10; ++i) {
-            char buf = '0' + (k++ % 10);
-            pipe.append(&buf, 1);
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
-        pipe.cancel("test");
-    };
-    std::thread t1(appender);
-
-    char buf[128];
-    int64_t buf_len = 128;
-    int64_t read_bytes = 0;
-    bool eof = false;
-    auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
-    EXPECT_FALSE(st.ok());
-    t1.join();
-}
-
-TEST_F(StreamLoadPipeTest, close) {
-    StreamLoadPipe pipe(66, 64);
-
-    auto appender = [&pipe] {
-        int k = 0;
-        {
-            auto byte_buf = ByteBuffer::allocate(64);
-            char buf[64];
-            for (int j = 0; j < 64; ++j) {
-                buf[j] = '0' + (k++ % 10);
-            }
-            byte_buf->put_bytes(buf, 64);
-            byte_buf->flip();
-            pipe.append(byte_buf);
-        }
-        {
-            auto byte_buf = ByteBuffer::allocate(64);
-            char buf[64];
-            for (int j = 0; j < 64; ++j) {
-                buf[j] = '0' + (k++ % 10);
-            }
-            byte_buf->put_bytes(buf, 64);
-            byte_buf->flip();
-            auto st = pipe.append(byte_buf);
-            EXPECT_FALSE(st.ok());
-        }
-    };
-    std::thread t1(appender);
-
-    std::this_thread::sleep_for(std::chrono::milliseconds(100));
-
-    pipe.close();
-
-    t1.join();
-}
-
-} // namespace doris
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index 05fca396dc..1182e1cb8e 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -30,7 +30,6 @@
 #include "runtime/exec_env.h"
 #include "runtime/result_queue_mgr.h"
 #include "runtime/runtime_state.h"
-#include "runtime/stream_load/load_stream_mgr.h"
 #include "runtime/types.h"
 #include "service/brpc.h"
 #include "util/brpc_client_cache.h"
@@ -344,7 +343,6 @@ public:
         k_add_batch_status = Status::OK();
         _env = ExecEnv::GetInstance();
         _env->_master_info = new TMasterInfo();
-        _env->_load_stream_mgr = new LoadStreamMgr();
         _env->_internal_client_cache = new 
BrpcClientCache<PBackendService_Stub>();
         _env->_function_client_cache = new 
BrpcClientCache<PFunctionService_Stub>();
         ThreadPoolBuilder("SendBatchThreadPool")
@@ -359,7 +357,6 @@ public:
     void TearDown() override {
         SAFE_DELETE(_env->_internal_client_cache);
         SAFE_DELETE(_env->_function_client_cache);
-        SAFE_DELETE(_env->_load_stream_mgr);
         SAFE_DELETE(_env->_master_info);
         if (_server) {
             _server->Stop(100);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to