This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a807978882 [refactor](non-vec) Remove rowbatch code from delta writer 
and some rowbatch related code (#15349)
a807978882 is described below

commit a807978882dfaea978201998d874012e9474bf70
Author: yiguolei <[email protected]>
AuthorDate: Mon Dec 26 08:54:51 2022 +0800

    [refactor](non-vec) Remove rowbatch code from delta writer and some 
rowbatch related code (#15349)
    
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/exec/data_sink.h                            |   1 -
 be/src/exec/exec_node.cpp                          |  45 --
 be/src/exec/exec_node.h                            |  45 --
 be/src/exec/row_batch_list.h                       | 130 ----
 be/src/exec/table_connector.cpp                    | 109 ----
 be/src/exec/table_connector.h                      |   4 -
 be/src/exec/tablet_info.h                          |   1 -
 be/src/olap/delta_writer.cpp                       |  26 -
 be/src/olap/delta_writer.h                         |   2 -
 be/src/runtime/fragment_mgr.cpp                    |   1 -
 be/src/runtime/load_channel.h                      |  12 +-
 be/src/runtime/plan_fragment_executor.h            |   1 -
 be/src/runtime/result_writer.h                     |   1 -
 be/src/runtime/tablets_channel.cpp                 |  11 +-
 be/src/service/internal_service.cpp                |  60 --
 be/src/service/internal_service.h                  |  15 -
 be/src/vec/core/block.cpp                          |  40 --
 be/src/vec/core/block.h                            |   7 -
 be/src/vec/exec/vbroker_scan_node.cpp              |   2 -
 be/src/vec/exec/vbroker_scan_node.h                |   1 -
 be/src/vec/exec/vtable_function_node.h             |   1 -
 be/src/vec/runtime/vsorted_run_merger.h            |   6 -
 be/src/vec/sink/vdata_stream_sender.h              |   1 -
 be/src/vec/sink/vmysql_result_writer.h             |   1 -
 be/src/vec/sink/vresult_sink.h                     |   1 -
 be/src/vec/sink/vtablet_sink.cpp                   |   1 -
 be/src/vec/sink/vtablet_sink.h                     |   7 -
 be/test/CMakeLists.txt                             |   2 -
 be/test/runtime/data_stream_test.cpp               | 667 ---------------------
 be/test/runtime/load_channel_mgr_test.cpp          | 474 ---------------
 be/test/runtime/result_sink_test.cpp               |  93 ---
 be/test/vec/exec/vtablet_sink_test.cpp             |  25 -
 .../apache/doris/utframe/MockedBackendFactory.java |   6 -
 gensrc/proto/internal_service.proto                |   2 -
 34 files changed, 4 insertions(+), 1797 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 299e1c5376..f558678308 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -32,7 +32,6 @@
 namespace doris {
 
 class ObjectPool;
-class RowBatch;
 class RuntimeProfile;
 class RuntimeState;
 class TPlanFragmentExecParams;
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 616bf9f5e0..ab9f52f4af 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -64,51 +64,6 @@ namespace doris {
 
 const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
 
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) : 
BlockingQueue<RowBatch*>(max_batches) {}
-
-ExecNode::RowBatchQueue::~RowBatchQueue() {
-    DCHECK(cleanup_queue_.empty());
-}
-
-void ExecNode::RowBatchQueue::AddBatch(RowBatch* batch) {
-    if (!blocking_put(batch)) {
-        std::lock_guard<std::mutex> lock(lock_);
-        cleanup_queue_.push_back(batch);
-    }
-}
-
-bool ExecNode::RowBatchQueue::AddBatchWithTimeout(RowBatch* batch, int64_t 
timeout_micros) {
-    // return blocking_put_with_timeout(batch, timeout_micros);
-    return blocking_put(batch);
-}
-
-RowBatch* ExecNode::RowBatchQueue::GetBatch() {
-    RowBatch* result = nullptr;
-    if (blocking_get(&result)) {
-        return result;
-    }
-    return nullptr;
-}
-
-int ExecNode::RowBatchQueue::Cleanup() {
-    int num_io_buffers = 0;
-
-    // RowBatch* batch = nullptr;
-    // while ((batch = GetBatch()) != nullptr) {
-    //   num_io_buffers += batch->num_io_buffers();
-    //   delete batch;
-    // }
-
-    std::lock_guard<std::mutex> l(lock_);
-    for (std::list<RowBatch*>::iterator it = cleanup_queue_.begin(); it != 
cleanup_queue_.end();
-         ++it) {
-        // num_io_buffers += (*it)->num_io_buffers();
-        delete *it;
-    }
-    cleanup_queue_.clear();
-    return num_io_buffers;
-}
-
 ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : _id(tnode.node_id),
           _type(tnode.node_type),
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index f5af72ac61..ff95d96934 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -40,7 +40,6 @@ class Expr;
 class ExprContext;
 class ObjectPool;
 class Counters;
-class RowBatch;
 class RuntimeState;
 class TPlan;
 class TupleRow;
@@ -266,50 +265,6 @@ protected:
     /// Only use in vectorized exec engine try to do projections to trans 
_row_desc -> _output_row_desc
     Status do_projections(vectorized::Block* origin_block, vectorized::Block* 
output_block);
 
-    /// Extends blocking queue for row batches. Row batches have a property 
that
-    /// they must be processed in the order they were produced, even in 
cancellation
-    /// paths. Preceding row batches can contain ptrs to memory in subsequent 
row batches
-    /// and we need to make sure those ptrs stay valid.
-    /// Row batches that are added after Shutdown() are queued in another 
queue, which can
-    /// be cleaned up during Close().
-    /// All functions are thread safe.
-    class RowBatchQueue : public BlockingQueue<RowBatch*> {
-    public:
-        /// max_batches is the maximum number of row batches that can be 
queued.
-        /// When the queue is full, producers will block.
-        RowBatchQueue(int max_batches);
-        ~RowBatchQueue();
-
-        /// Adds a batch to the queue. This is blocking if the queue is full.
-        void AddBatch(RowBatch* batch);
-
-        /// Adds a batch to the queue. If the queue is full, this blocks until 
space becomes
-        /// available or 'timeout_micros' has elapsed.
-        /// Returns true if the element was added to the queue, false if it 
wasn't. If this
-        /// method returns false, the queue didn't take ownership of the batch 
and it must be
-        /// managed externally.
-        bool AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros);
-
-        /// Gets a row batch from the queue. Returns nullptr if there are no 
more.
-        /// This function blocks.
-        /// Returns nullptr after Shutdown().
-        RowBatch* GetBatch();
-
-        /// Deletes all row batches in cleanup_queue_. Not valid to call 
AddBatch()
-        /// after this is called.
-        /// Returns the number of io buffers that were released (for debug 
tracking)
-        int Cleanup();
-
-    private:
-        /// Lock protecting cleanup_queue_
-        // SpinLock lock_;
-        // TODO(dhc): need to modify spinlock
-        std::mutex lock_;
-
-        /// Queue of orphaned row batches
-        std::list<RowBatch*> cleanup_queue_;
-    };
-
     int _id; // unique w/in single plan tree
     TPlanNodeType::type _type;
     ObjectPool* _pool;
diff --git a/be/src/exec/row_batch_list.h b/be/src/exec/row_batch_list.h
deleted file mode 100644
index a81f0aae86..0000000000
--- a/be/src/exec/row_batch_list.h
+++ /dev/null
@@ -1,130 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// 
https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/row-batch-list.h
-// and modified by Doris
-
-#pragma once
-
-#include <string>
-#include <vector>
-
-#include "runtime/row_batch.h"
-#include "runtime/tuple_row.h"
-
-namespace doris {
-
-class TupleRow;
-class RowDescriptor;
-class MemPool;
-
-// A simple list structure for RowBatches that provides an interface for
-// iterating over the TupleRows.
-class RowBatchList {
-public:
-    RowBatchList() : _total_num_rows(0) {}
-    virtual ~RowBatchList() {}
-
-    // A simple iterator used to scan over all the rows stored in the list.
-    class TupleRowIterator {
-    public:
-        // Dummy constructor
-        TupleRowIterator() : _list(nullptr), _row_idx(0) {}
-        virtual ~TupleRowIterator() {}
-
-        // Returns true if this iterator is at the end, i.e. get_row() cannot 
be called.
-        bool at_end() { return _batch_it == _list->_row_batches.end(); }
-
-        // Returns the current row. Callers must check the iterator is not 
at_end() before
-        // calling get_row().
-        TupleRow* get_row() {
-            DCHECK(!at_end());
-            return (*_batch_it)->get_row(_row_idx);
-        }
-
-        // Increments the iterator. No-op if the iterator is at the end.
-        void next() {
-            if (_batch_it == _list->_row_batches.end()) {
-                return;
-            }
-
-            if (++_row_idx == (*_batch_it)->num_rows()) {
-                ++_batch_it;
-                _row_idx = 0;
-            }
-        }
-
-    private:
-        friend class RowBatchList;
-
-        TupleRowIterator(RowBatchList* list)
-                : _list(list), _batch_it(list->_row_batches.begin()), 
_row_idx(0) {}
-
-        RowBatchList* _list;
-        std::vector<RowBatch*>::iterator _batch_it;
-        int64_t _row_idx;
-    };
-
-    // Add the 'row_batch' to the list. The RowBatch* and all of its resources 
are owned
-    // by the caller.
-    void add_row_batch(RowBatch* row_batch) {
-        if (row_batch->num_rows() == 0) {
-            return;
-        }
-
-        _row_batches.push_back(row_batch);
-        _total_num_rows += row_batch->num_rows();
-    }
-
-    // Resets the list.
-    void reset() {
-        _row_batches.clear();
-        _total_num_rows = 0;
-    }
-
-    // Outputs a debug string containing the contents of the list.
-    std::string debug_string(const RowDescriptor& desc) {
-        std::stringstream out;
-        out << "RowBatchList(";
-        out << "num_rows=" << _total_num_rows << "; ";
-        RowBatchList::TupleRowIterator it = iterator();
-
-        while (!it.at_end()) {
-            out << " " << it.get_row()->to_string(desc);
-            it.next();
-        }
-
-        out << " )";
-        return out.str();
-    }
-
-    // Returns the total number of rows in all row batches.
-    int64_t total_num_rows() { return _total_num_rows; }
-
-    // Returns a new iterator over all the tuple rows.
-    TupleRowIterator iterator() { return TupleRowIterator(this); }
-
-private:
-    friend class TupleRowIterator;
-
-    std::vector<RowBatch*> _row_batches;
-
-    // Total number of rows
-    int64_t _total_num_rows;
-};
-
-} // namespace doris
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index fa725832d3..e342f9abd6 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -45,115 +45,6 @@ std::u16string TableConnector::utf8_to_u16string(const 
char* first, const char*
     return utf8_utf16_cvt.from_bytes(first, last);
 }
 
-Status TableConnector::append(const std::string& table_name, RowBatch* batch,
-                              const std::vector<ExprContext*>& 
output_expr_ctxs,
-                              uint32_t start_send_row, uint32* num_rows_sent) {
-    _insert_stmt_buffer.clear();
-    std::u16string insert_stmt;
-    {
-        SCOPED_TIMER(_convert_tuple_timer);
-        fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", 
table_name);
-
-        int num_rows = batch->num_rows();
-        for (int i = start_send_row; i < num_rows; ++i) {
-            auto row = batch->get_row(i);
-            (*num_rows_sent)++;
-
-            // Construct insert statement of odbc table
-            int num_columns = output_expr_ctxs.size();
-            for (int j = 0; j < num_columns; ++j) {
-                if (j != 0) {
-                    fmt::format_to(_insert_stmt_buffer, "{}", ", ");
-                }
-                void* item = output_expr_ctxs[j]->get_value(row);
-                if (item == nullptr) {
-                    fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
-                    continue;
-                }
-                switch (output_expr_ctxs[j]->root()->type().type) {
-                case TYPE_BOOLEAN:
-                case TYPE_TINYINT:
-                    fmt::format_to(_insert_stmt_buffer, "{}", 
*static_cast<int8_t*>(item));
-                    break;
-                case TYPE_SMALLINT:
-                    fmt::format_to(_insert_stmt_buffer, "{}", 
*static_cast<int16_t*>(item));
-                    break;
-                case TYPE_INT:
-                    fmt::format_to(_insert_stmt_buffer, "{}", 
*static_cast<int32_t*>(item));
-                    break;
-                case TYPE_BIGINT:
-                    fmt::format_to(_insert_stmt_buffer, "{}", 
*static_cast<int64_t*>(item));
-                    break;
-                case TYPE_FLOAT:
-                    fmt::format_to(_insert_stmt_buffer, "{}", 
*static_cast<float*>(item));
-                    break;
-                case TYPE_DOUBLE:
-                    fmt::format_to(_insert_stmt_buffer, "{}", 
*static_cast<double*>(item));
-                    break;
-                case TYPE_DATE:
-                case TYPE_DATETIME: {
-                    char buf[64];
-                    const auto* time_val = (const DateTimeValue*)(item);
-                    time_val->to_string(buf);
-                    fmt::format_to(_insert_stmt_buffer, "'{}'", buf);
-                    break;
-                }
-                case TYPE_VARCHAR:
-                case TYPE_CHAR:
-                case TYPE_STRING: {
-                    const auto* string_val = (const StringValue*)(item);
-
-                    if (string_val->ptr == nullptr) {
-                        if (string_val->len == 0) {
-                            fmt::format_to(_insert_stmt_buffer, "{}", "''");
-                        } else {
-                            fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
-                        }
-                    } else {
-                        fmt::format_to(_insert_stmt_buffer, "'{}'",
-                                       fmt::basic_string_view(string_val->ptr, 
string_val->len));
-                    }
-                    break;
-                }
-                case TYPE_DECIMALV2: {
-                    const DecimalV2Value decimal_val(
-                            reinterpret_cast<const 
PackedInt128*>(item)->value);
-                    char buffer[MAX_DECIMAL_WIDTH];
-                    int output_scale = 
output_expr_ctxs[j]->root()->output_scale();
-                    int len = decimal_val.to_buffer(buffer, output_scale);
-                    _insert_stmt_buffer.append(buffer, buffer + len);
-                    break;
-                }
-                case TYPE_LARGEINT: {
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   reinterpret_cast<const 
PackedInt128*>(item)->value);
-                    break;
-                }
-                default: {
-                    return Status::InternalError("can't convert this type to 
mysql type. type = {}",
-                                                 
output_expr_ctxs[j]->root()->type().type);
-                }
-                }
-            }
-
-            if (i < num_rows - 1 && _insert_stmt_buffer.size() < 
INSERT_BUFFER_SIZE) {
-                fmt::format_to(_insert_stmt_buffer, "{}", "),(");
-            } else {
-                // batch exhausted or _insert_stmt_buffer is full, need to do 
real insert stmt
-                fmt::format_to(_insert_stmt_buffer, "{}", ")");
-                break;
-            }
-        }
-        // Translate utf8 string to utf16 to use unicode encoding
-        insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
-                                        _insert_stmt_buffer.data() + 
_insert_stmt_buffer.size());
-    }
-
-    RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer));
-    COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
-    return Status::OK();
-}
-
 Status TableConnector::append(const std::string& table_name, 
vectorized::Block* block,
                               const std::vector<vectorized::VExprContext*>& 
output_vexpr_ctxs,
                               uint32_t start_send_row, uint32_t* num_rows_sent,
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 3fa9f5f5b1..a6077b3227 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -49,10 +49,6 @@ public:
 
     virtual Status exec_write_sql(const std::u16string& insert_stmt,
                                   const fmt::memory_buffer& 
_insert_stmt_buffer) = 0;
-    //write data into table row batch
-    Status append(const std::string& table_name, RowBatch* batch,
-                  const std::vector<ExprContext*>& _output_expr_ctxs, uint32_t 
start_send_row,
-                  uint32_t* num_rows_sent);
 
     //write data into table vectorized
     Status append(const std::string& table_name, vectorized::Block* block,
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 247cd1a4f1..0b6fd00dbb 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -36,7 +36,6 @@
 namespace doris {
 
 class MemPool;
-class RowBatch;
 
 struct OlapTableIndexSchema {
     int64_t index_id;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 2a0fded006..aa05bed875 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -183,32 +183,6 @@ Status DeltaWriter::write(Tuple* tuple) {
     return Status::OK();
 }
 
-Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& 
row_idxs) {
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_is_init && !_is_cancelled) {
-        RETURN_NOT_OK(init());
-    }
-
-    if (_is_cancelled) {
-        return _cancel_status;
-    }
-
-    _total_received_rows += row_idxs.size();
-    for (const auto& row_idx : row_idxs) {
-        _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
-    }
-
-    if (_mem_table->memory_usage() >= config::write_buffer_size) {
-        auto s = _flush_memtable_async();
-        _reset_mem_table();
-        if (OLAP_UNLIKELY(!s.ok())) {
-            return s;
-        }
-    }
-
-    return Status::OK();
-}
-
 Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs) {
     if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index dafd77a8f8..eac3ea75cc 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -27,7 +27,6 @@ namespace doris {
 class FlushToken;
 class MemTable;
 class MemTracker;
-class RowBatch;
 class Schema;
 class StorageEngine;
 class Tuple;
@@ -64,7 +63,6 @@ public:
     Status init();
 
     Status write(Tuple* tuple);
-    Status write(const RowBatch* row_batch, const std::vector<int>& row_idxs);
     Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before close_wait()
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0beccbdf9c..134df9f221 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1010,7 +1010,6 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
     per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, 
scan_ranges));
     fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
     exec_fragment_params.__set_params(fragment_exec_params);
-    // batch_size for one RowBatch
     TQueryOptions query_options;
     query_options.batch_size = params.batch_size;
     query_options.query_timeout = params.query_timeout;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 9d8e5f2f33..3581363517 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -153,15 +153,9 @@ Status LoadChannel::add_batch(const 
TabletWriterAddRequest& request,
         return st;
     }
 
-    // 2. add batch to tablets channel
-    if constexpr (std::is_same_v<TabletWriterAddRequest, 
PTabletWriterAddBatchRequest>) {
-        if (request.has_row_batch()) {
-            RETURN_IF_ERROR(channel->add_batch(request, response));
-        }
-    } else {
-        if (request.has_block()) {
-            RETURN_IF_ERROR(channel->add_batch(request, response));
-        }
+    // 2. add block to tablets channel
+    if (request.has_block()) {
+        RETURN_IF_ERROR(channel->add_batch(request, response));
     }
 
     // 3. handle eos
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 013c56471f..22bbe785b2 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -36,7 +36,6 @@ namespace doris {
 class QueryFragmentsCtx;
 class ExecNode;
 class RowDescriptor;
-class RowBatch;
 class DataSink;
 class DataStreamMgr;
 class RuntimeProfile;
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index a77956c0c4..4a14a66637 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -23,7 +23,6 @@
 namespace doris {
 
 class Status;
-class RowBatch;
 class RuntimeState;
 struct TypeDescriptor;
 
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 6fdc42f2c4..7856864df4 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -466,13 +466,7 @@ Status TabletsChannel::add_batch(const 
TabletWriterAddRequest& request,
         }
     }
 
-    auto get_send_data = [&]() {
-        if constexpr (std::is_same_v<TabletWriterAddRequest, 
PTabletWriterAddBatchRequest>) {
-            return RowBatch(*_row_desc, request.row_batch());
-        } else {
-            return vectorized::Block(request.block());
-        }
-    };
+    auto get_send_data = [&]() { return vectorized::Block(request.block()); };
 
     auto send_data = get_send_data();
     google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
@@ -507,9 +501,6 @@ Status TabletsChannel::add_batch(const 
TabletWriterAddRequest& request,
     return Status::OK();
 }
 
-template Status
-TabletsChannel::add_batch<PTabletWriterAddBatchRequest, 
PTabletWriterAddBatchResult>(
-        PTabletWriterAddBatchRequest const&, PTabletWriterAddBatchResult*);
 template Status
 TabletsChannel::add_batch<PTabletWriterAddBlockRequest, 
PTabletWriterAddBlockResult>(
         PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index e01b83209e..af956b58ca 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -231,66 +231,6 @@ void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
     });
 }
 
-void 
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* 
cntl_base,
-                                                   const 
PTabletWriterAddBatchRequest* request,
-                                                   
PTabletWriterAddBatchResult* response,
-                                                   google::protobuf::Closure* 
done) {
-    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-    _tablet_writer_add_batch(cntl_base, request, response, new_done);
-}
-
-void PInternalServiceImpl::tablet_writer_add_batch_by_http(
-        google::protobuf::RpcController* cntl_base, const 
::doris::PEmptyRequest* request,
-        PTabletWriterAddBatchResult* response, google::protobuf::Closure* 
done) {
-    PTabletWriterAddBatchRequest* new_request = new 
PTabletWriterAddBatchRequest();
-    google::protobuf::Closure* new_done =
-            new NewHttpClosure<PTabletWriterAddBatchRequest>(new_request, 
done);
-    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    Status st = 
attachment_extract_request_contain_tuple<PTabletWriterAddBatchRequest>(new_request,
-                                                                               
        cntl);
-    if (st.ok()) {
-        _tablet_writer_add_batch(cntl_base, new_request, response, new_done);
-    } else {
-        st.to_protobuf(response->mutable_status());
-    }
-}
-
-void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcController* 
cntl_base,
-                                                    const 
PTabletWriterAddBatchRequest* request,
-                                                    
PTabletWriterAddBatchResult* response,
-                                                    google::protobuf::Closure* 
done) {
-    VLOG_RPC << "tablet writer add batch, id=" << request->id()
-             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
-             << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
-    // add batch maybe cost a lot of time, and this callback thread will be 
held.
-    // this will influence query execution, because the pthreads under bthread 
may be
-    // exhausted, so we put this to a local thread pool to process
-    int64_t submit_task_time_ns = MonotonicNanos();
-    _tablet_worker_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns, this]() {
-        int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
-        brpc::ClosureGuard closure_guard(done);
-        int64_t execution_time_ns = 0;
-        {
-            SCOPED_RAW_TIMER(&execution_time_ns);
-
-            // TODO(zxy) delete in 1.2 version
-            brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-            
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, 
cntl);
-
-            auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
-            if (!st.ok()) {
-                LOG(WARNING) << "tablet writer add batch failed, message=" << 
st
-                             << ", id=" << request->id() << ", index_id=" << 
request->index_id()
-                             << ", sender_id=" << request->sender_id()
-                             << ", backend id=" << request->backend_id();
-            }
-            st.to_protobuf(response->mutable_status());
-        }
-        response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
-        response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
-    });
-}
-
 void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* 
controller,
                                                 const 
PTabletWriterCancelRequest* request,
                                                 PTabletWriterCancelResult* 
response,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 3ea3655974..9b2a4db254 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -78,16 +78,6 @@ public:
                             PTabletWriterOpenResult* response,
                             google::protobuf::Closure* done) override;
 
-    void tablet_writer_add_batch(google::protobuf::RpcController* controller,
-                                 const PTabletWriterAddBatchRequest* request,
-                                 PTabletWriterAddBatchResult* response,
-                                 google::protobuf::Closure* done) override;
-
-    void tablet_writer_add_batch_by_http(google::protobuf::RpcController* 
controller,
-                                         const ::doris::PEmptyRequest* request,
-                                         PTabletWriterAddBatchResult* response,
-                                         google::protobuf::Closure* done) 
override;
-
     void tablet_writer_add_block(google::protobuf::RpcController* controller,
                                  const PTabletWriterAddBlockRequest* request,
                                  PTabletWriterAddBlockResult* response,
@@ -178,11 +168,6 @@ private:
                          ::doris::PTransmitDataResult* response, 
::google::protobuf::Closure* done,
                          const Status& extract_st);
 
-    void _tablet_writer_add_batch(google::protobuf::RpcController* controller,
-                                  const PTabletWriterAddBatchRequest* request,
-                                  PTabletWriterAddBatchResult* response,
-                                  google::protobuf::Closure* done);
-
     void _tablet_writer_add_block(google::protobuf::RpcController* controller,
                                   const PTabletWriterAddBlockRequest* request,
                                   PTabletWriterAddBlockResult* response,
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 0f4fe065f2..571a391eb0 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -793,46 +793,6 @@ Status Block::serialize(int be_exec_version, PBlock* 
pblock,
     return Status::OK();
 }
 
-void Block::serialize(RowBatch* output_batch, const RowDescriptor& row_desc) {
-    auto num_rows = rows();
-    auto mem_pool = output_batch->tuple_data_pool();
-
-    for (int i = 0; i < num_rows; ++i) {
-        auto tuple_row = output_batch->get_row(i);
-        const auto& tuple_descs = row_desc.tuple_descriptors();
-        auto column_offset = 0;
-
-        for (int j = 0; j < tuple_descs.size(); ++j) {
-            auto tuple_desc = tuple_descs[j];
-            tuple_row->set_tuple(j, deep_copy_tuple(*tuple_desc, mem_pool, i, 
column_offset));
-            column_offset += tuple_desc->slots().size();
-        }
-        output_batch->commit_last_row();
-    }
-}
-
-doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc, 
MemPool* pool, int row,
-                                     int column_offset, bool padding_char) {
-    auto dst = 
reinterpret_cast<doris::Tuple*>(pool->allocate(desc.byte_size()));
-
-    for (int i = 0; i < desc.slots().size(); ++i) {
-        auto slot_desc = desc.slots()[i];
-        auto& type_desc = slot_desc->type();
-        const auto& column = get_by_position(column_offset + i).column;
-        const auto& data_ref =
-                type_desc.type != TYPE_ARRAY ? column->get_data_at(row) : 
StringRef();
-        bool is_null = is_column_data_null(slot_desc->type(), data_ref, 
column, row);
-        if (is_null) {
-            dst->set_null(slot_desc->null_indicator_offset());
-        } else {
-            dst->set_not_null(slot_desc->null_indicator_offset());
-            deep_copy_slot(dst->get_slot(slot_desc->tuple_offset()), pool, 
type_desc, data_ref,
-                           column.get(), row, padding_char);
-        }
-    }
-    return dst;
-}
-
 inline bool Block::is_column_data_null(const doris::TypeDescriptor& type_desc,
                                        const StringRef& data_ref, const 
IColumn* column, int row) {
     if (type_desc.type != TYPE_ARRAY) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 6b7cc9d5a1..803fb82f97 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -40,7 +40,6 @@
 namespace doris {
 
 class MemPool;
-class RowBatch;
 class RowDescriptor;
 class Status;
 class Tuple;
@@ -280,9 +279,6 @@ public:
                      size_t* compressed_bytes, segment_v2::CompressionTypePB 
compression_type,
                      bool allow_transfer_large_data = false) const;
 
-    // serialize block to PRowbatch
-    void serialize(RowBatch*, const RowDescriptor&);
-
     std::unique_ptr<Block> create_same_struct_block(size_t size) const;
 
     /** Compares (*this) n-th row and rhs m-th row.
@@ -346,9 +342,6 @@ public:
         return res;
     }
 
-    doris::Tuple* deep_copy_tuple(const TupleDescriptor&, MemPool*, int, int,
-                                  bool padding_char = false);
-
     // for String type or Array<String> type
     void shrink_char_type_column_suffix_zero(const std::vector<size_t>& 
char_type_idx);
 
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp 
b/be/src/vec/exec/vbroker_scan_node.cpp
index e79d3ce104..ac96f97965 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -211,8 +211,6 @@ Status VBrokerScanNode::close(RuntimeState* state) {
         _scanner_threads[i].join();
     }
 
-    // Close
-    _batch_queue.clear();
     return ExecNode::close(state);
 }
 
diff --git a/be/src/vec/exec/vbroker_scan_node.h 
b/be/src/vec/exec/vbroker_scan_node.h
index 9c5e436b19..de46104088 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -88,7 +88,6 @@ private:
     std::mutex _batch_queue_lock;
     std::condition_variable _queue_reader_cond;
     std::condition_variable _queue_writer_cond;
-    std::deque<std::shared_ptr<RowBatch>> _batch_queue;
 
     int _num_running_scanners;
 
diff --git a/be/src/vec/exec/vtable_function_node.h 
b/be/src/vec/exec/vtable_function_node.h
index 85eccc047a..e28a700f46 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -110,7 +110,6 @@ private:
     std::vector<SlotDescriptor*> _child_slots;
     std::vector<SlotDescriptor*> _output_slots;
     int64_t _cur_child_offset = 0;
-    std::shared_ptr<RowBatch> _cur_child_batch;
 
     std::vector<ExprContext*> _fn_ctxs;
     std::vector<vectorized::VExprContext*> _vfn_ctxs;
diff --git a/be/src/vec/runtime/vsorted_run_merger.h 
b/be/src/vec/runtime/vsorted_run_merger.h
index 974b2f6096..24a9ce7602 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -23,7 +23,6 @@
 
 namespace doris {
 
-class RowBatch;
 class RuntimeProfile;
 
 namespace vectorized {
@@ -54,11 +53,6 @@ public:
     // Return the next block of sorted rows from this merger.
     Status get_next(Block* output_block, bool* eos);
 
-    // Do not support now
-    virtual Status get_batch(RowBatch** output_batch) {
-        return Status::InternalError("no support method get_batch(RowBatch** 
output_batch)");
-    }
-
 protected:
     const std::vector<VExprContext*>& _ordering_expr;
     const std::vector<bool>& _is_asc_order;
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 69cd1ecc9b..2163d26c4b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -35,7 +35,6 @@
 
 namespace doris {
 class ObjectPool;
-class RowBatch;
 class RuntimeState;
 class RuntimeProfile;
 class BufferControlBlock;
diff --git a/be/src/vec/sink/vmysql_result_writer.h 
b/be/src/vec/sink/vmysql_result_writer.h
index e566a30213..3f79f0e2d6 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -24,7 +24,6 @@
 
 namespace doris {
 class BufferControlBlock;
-class RowBatch;
 class MysqlRowBuffer;
 class TFetchDataResult;
 
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index 63441e3179..66fb675def 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -21,7 +21,6 @@
 
 namespace doris {
 class ObjectPool;
-class RowBatch;
 class RuntimeState;
 class RuntimeProfile;
 class BufferControlBlock;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 13aa7661ea..f042ce50b4 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1247,7 +1247,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status 
exec_status) {
     }
 
     Expr::close(_output_expr_ctxs, state);
-    _output_batch.reset();
 
     _close_status = status;
     DataSink::close(state, exec_status);
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 097bb1e6b7..1c0a1be8d1 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -303,12 +303,6 @@ protected:
     // rows number received per tablet, tablet_id -> rows_num
     std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
 
-    std::unique_ptr<RowBatch> _cur_batch;
-    PTabletWriterAddBatchRequest _cur_add_batch_request;
-    using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, 
PTabletWriterAddBatchRequest>;
-    std::queue<AddBatchReq> _pending_batches;
-    ReusableClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
-
     std::unique_ptr<vectorized::MutableBlock> _cur_mutable_block;
     PTabletWriterAddBlockRequest _cur_add_block_request;
 
@@ -543,7 +537,6 @@ private:
 
     OlapTablePartitionParam* _partition = nullptr;
     std::vector<ExprContext*> _output_expr_ctxs;
-    std::unique_ptr<RowBatch> _output_batch;
 
     VOlapTablePartitionParam* _vpartition = nullptr;
     std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index f4a218b020..ce33cd1fda 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -151,8 +151,6 @@ set(OLAP_TEST_FILES
 set(RUNTIME_TEST_FILES
     # runtime/buffer_control_block_test.cpp
     # runtime/result_buffer_mgr_test.cpp
-    # runtime/result_sink_test.cpp
-    # runtime/data_stream_test.cpp
     # runtime/parallel_executor_test.cpp
     # runtime/datetime_value_test.cpp
     # runtime/dpp_sink_internal_test.cpp
diff --git a/be/test/runtime/data_stream_test.cpp 
b/be/test/runtime/data_stream_test.cpp
deleted file mode 100644
index e3b6695dfa..0000000000
--- a/be/test/runtime/data_stream_test.cpp
+++ /dev/null
@@ -1,667 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <iostream>
-#include <thread>
-
-#include "common/status.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/BackendService.h"
-#include "gen_cpp/Descriptors_types.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/client_cache.h"
-#include "runtime/data_stream_mgr.h"
-#include "runtime/data_stream_recvr.h"
-#include "runtime/data_stream_sender.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "util/cpu_info.h"
-#include "util/debug_util.h"
-#include "util/disk_info.h"
-#include "util/mem_info.h"
-#include "util/thrift_server.h"
-
-using std::string;
-using std::vector;
-using std::multiset;
-
-using std::unique_ptr;
-using std::thread;
-
-namespace doris {
-
-class DorisTestBackend : public BackendServiceIf {
-public:
-    DorisTestBackend(DataStreamMgr* stream_mgr) : _mgr(stream_mgr) {}
-    virtual ~DorisTestBackend() {}
-
-    virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
-                                    const TExecPlanFragmentParams& params) {}
-
-    virtual void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
-                                      const TCancelPlanFragmentParams& params) 
{}
-
-    virtual void transmit_data(TTransmitDataResult& return_val, const 
TTransmitDataParams& params) {
-        /*
-        LOG(ERROR) << "transmit_data(): instance_id=" << 
params.dest_fragment_instance_id
-            << " node_id=" << params.dest_node_id
-            << " #rows=" << params.row_batch.num_rows
-            << " eos=" << (params.eos ? "true" : "false");
-        if (!params.eos) {
-            _mgr->add_data(
-                    params.dest_fragment_instance_id,
-                    params.dest_node_id,
-                    params.row_batch,
-                    params.sender_id).set_t_status(&return_val);
-        } else {
-            Status status = _mgr->close_sender(
-                    params.dest_fragment_instance_id, params.dest_node_id, 
params.sender_id, params.be_number);
-            status.set_t_status(&return_val);
-            LOG(ERROR) << "close_sender status: " << status;
-        }
-        */
-    }
-
-    virtual void fetch_data(TFetchDataResult& return_val, const 
TFetchDataParams& params) {}
-
-    virtual void submit_tasks(TAgentResult& return_val,
-                              const std::vector<TAgentTaskRequest>& tasks) {}
-
-    virtual void make_snapshot(TAgentResult& return_val, const 
TSnapshotRequest& snapshot_request) {
-    }
-
-    virtual void release_snapshot(TAgentResult& return_val, const std::string& 
snapshot_path) {}
-
-    virtual void publish_cluster_state(TAgentResult& return_val,
-                                       const TAgentPublishRequest& request) {}
-
-    virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
-                                         const int32_t num_senders) {}
-
-    virtual void deregister_pull_load_task(TStatus& _return, const TUniqueId& 
id) {}
-
-    virtual void report_pull_load_sub_task_info(TStatus& _return,
-                                                const TPullLoadSubTaskInfo& 
task_info) {}
-
-    virtual void fetch_pull_load_task_info(TFetchPullLoadTaskInfoResult& 
_return,
-                                           const TUniqueId& id) {}
-
-    virtual void 
fetch_all_pull_load_task_infos(TFetchAllPullLoadTaskInfosResult& _return) {}
-
-private:
-    DataStreamMgr* _mgr;
-};
-
-class DataStreamTest : public testing::Test {
-protected:
-    DataStreamTest() : _runtime_state(TUniqueId(), TQueryOptions(), "", 
&_exec_env), _next_val(0) {
-        _exec_env.init_for_tests();
-        _runtime_state.init_mem_trackers(TUniqueId());
-    }
-    // null dtor to pass codestyle check
-    ~DataStreamTest() {}
-
-    virtual void SetUp() {
-        create_row_desc();
-        create_tuple_comparator();
-        create_row_batch();
-
-        _next_instance_id.lo = 0;
-        _next_instance_id.hi = 0;
-        _stream_mgr = new DataStreamMgr();
-
-        _broadcast_sink.dest_node_id = DEST_NODE_ID;
-        _broadcast_sink.output_partition.type = TPartitionType::UNPARTITIONED;
-
-        _random_sink.dest_node_id = DEST_NODE_ID;
-        _random_sink.output_partition.type = TPartitionType::RANDOM;
-
-        _hash_sink.dest_node_id = DEST_NODE_ID;
-        _hash_sink.output_partition.type = TPartitionType::HASH_PARTITIONED;
-        // there's only one column to partition on
-        TExprNode expr_node;
-        expr_node.node_type = TExprNodeType::SLOT_REF;
-        expr_node.type.types.push_back(TTypeNode());
-        expr_node.type.types.back().__isset.scalar_type = true;
-        expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
-        expr_node.num_children = 0;
-        TSlotRef slot_ref;
-        slot_ref.slot_id = 0;
-        expr_node.__set_slot_ref(slot_ref);
-        TExpr expr;
-        expr.nodes.push_back(expr_node);
-        _hash_sink.output_partition.__isset.partition_exprs = true;
-        _hash_sink.output_partition.partition_exprs.push_back(expr);
-
-        // Ensure that individual sender info addresses don't change
-        _sender_info.reserve(MAX_SENDERS);
-        _receiver_info.reserve(MAX_RECEIVERS);
-        start_backend();
-    }
-
-    const TDataStreamSink& get_sink(TPartitionType::type partition_type) {
-        switch (partition_type) {
-        case TPartitionType::UNPARTITIONED:
-            return _broadcast_sink;
-        case TPartitionType::RANDOM:
-            return _random_sink;
-        case TPartitionType::HASH_PARTITIONED:
-            return _hash_sink;
-        default:
-            DCHECK(false) << "Unhandled sink type: " << partition_type;
-        }
-        // Should never reach this.
-        return _broadcast_sink;
-    }
-
-    virtual void TearDown() {
-        _lhs_slot_ctx->close(nullptr);
-        _rhs_slot_ctx->close(nullptr);
-        _exec_env.client_cache()->test_shutdown();
-        stop_backend();
-    }
-
-    void reset() {
-        _sender_info.clear();
-        _receiver_info.clear();
-        _dest.clear();
-    }
-
-    // We reserve contiguous memory for senders in SetUp. If a test uses more
-    // senders, a DCHECK will fail and you should increase this value.
-    static const int MAX_SENDERS = 16;
-    static const int MAX_RECEIVERS = 16;
-    static const PlanNodeId DEST_NODE_ID = 1;
-    static const int BATCH_CAPACITY = 100; // rows
-    static const int PER_ROW_DATA = 8;
-    static const int TOTAL_DATA_SIZE = 8 * 1024;
-    static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / 
PER_ROW_DATA;
-
-    ObjectPool _obj_pool;
-    DescriptorTbl* _desc_tbl;
-    const RowDescriptor* _row_desc;
-    TupleRowComparator* _less_than;
-    ExecEnv _exec_env;
-    RuntimeState _runtime_state;
-    TUniqueId _next_instance_id;
-    string _stmt;
-
-    // RowBatch generation
-    std::unique_ptr<RowBatch> _batch;
-    int _next_val;
-    int64_t* _tuple_mem;
-
-    // receiving node
-    DataStreamMgr* _stream_mgr;
-    ThriftServer* _server;
-
-    // sending node(s)
-    TDataStreamSink _broadcast_sink;
-    TDataStreamSink _random_sink;
-    TDataStreamSink _hash_sink;
-    std::vector<TPlanFragmentDestination> _dest;
-
-    struct SenderInfo {
-        thread* thread_handle;
-        Status status;
-        int num_bytes_sent;
-
-        SenderInfo() : thread_handle(nullptr), num_bytes_sent(0) {}
-    };
-    std::vector<SenderInfo> _sender_info;
-
-    struct ReceiverInfo {
-        TPartitionType::type stream_type;
-        int num_senders;
-        int receiver_num;
-
-        thread* thread_handle;
-        std::shared_ptr<DataStreamRecvr> stream_recvr;
-        Status status;
-        int num_rows_received;
-        multiset<int64_t> data_values;
-
-        ReceiverInfo(TPartitionType::type stream_type, int num_senders, int 
receiver_num)
-                : stream_type(stream_type),
-                  num_senders(num_senders),
-                  receiver_num(receiver_num),
-                  thread_handle(nullptr),
-                  stream_recvr(nullptr),
-                  num_rows_received(0) {}
-
-        ~ReceiverInfo() {
-            delete thread_handle;
-            stream_recvr.reset();
-        }
-    };
-    std::vector<ReceiverInfo> _receiver_info;
-
-    // Create an instance id and add it to _dest
-    void get_next_instance_id(TUniqueId* instance_id) {
-        _dest.push_back(TPlanFragmentDestination());
-        TPlanFragmentDestination& dest = _dest.back();
-        dest.fragment_instance_id = _next_instance_id;
-        dest.server.hostname = "127.0.0.1";
-        dest.server.port = config::port;
-        *instance_id = _next_instance_id;
-        ++_next_instance_id.lo;
-    }
-
-    // RowDescriptor to mimic "select bigint_col from alltypesagg", except the 
slot
-    // isn't nullable
-    void create_row_desc() {
-        // create DescriptorTbl
-        TTupleDescriptor tuple_desc;
-        tuple_desc.__set_id(0);
-        tuple_desc.__set_byteSize(8);
-        tuple_desc.__set_numNullBytes(0);
-        TDescriptorTable thrift_desc_tbl;
-        thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc);
-        TSlotDescriptor slot_desc;
-        slot_desc.__set_id(0);
-        slot_desc.__set_parent(0);
-
-        slot_desc.slotType.types.push_back(TTypeNode());
-        slot_desc.slotType.types.back().__isset.scalar_type = true;
-        slot_desc.slotType.types.back().scalar_type.type = 
TPrimitiveType::BIGINT;
-
-        slot_desc.__set_columnPos(0);
-        slot_desc.__set_byteOffset(0);
-        slot_desc.__set_nullIndicatorByte(0);
-        slot_desc.__set_nullIndicatorBit(-1);
-        slot_desc.__set_slotIdx(0);
-        slot_desc.__set_isMaterialized(true);
-        thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
-        EXPECT_TRUE(DescriptorTbl::create(&_obj_pool, thrift_desc_tbl, 
&_desc_tbl).ok());
-        _runtime_state.set_desc_tbl(_desc_tbl);
-
-        std::vector<TTupleId> row_tids;
-        row_tids.push_back(0);
-
-        std::vector<bool> nullable_tuples;
-        nullable_tuples.push_back(false);
-        _row_desc = _obj_pool.add(new RowDescriptor(*_desc_tbl, row_tids, 
nullable_tuples));
-    }
-
-    // Create a tuple comparator to sort in ascending order on the single 
bigint column.
-    void create_tuple_comparator() {
-        TExprNode expr_node;
-        expr_node.node_type = TExprNodeType::SLOT_REF;
-        expr_node.type.types.push_back(TTypeNode());
-        expr_node.type.types.back().__isset.scalar_type = true;
-        expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
-        expr_node.num_children = 0;
-        TSlotRef slot_ref;
-        slot_ref.slot_id = 0;
-        expr_node.__set_slot_ref(slot_ref);
-
-        SlotRef* lhs_slot = _obj_pool.add(new SlotRef(expr_node));
-        _lhs_slot_ctx = _obj_pool.add(new ExprContext(lhs_slot));
-        SlotRef* rhs_slot = _obj_pool.add(new SlotRef(expr_node));
-        _rhs_slot_ctx = _obj_pool.add(new ExprContext(rhs_slot));
-
-        _lhs_slot_ctx->prepare(&_runtime_state, *_row_desc);
-        _rhs_slot_ctx->prepare(&_runtime_state, *_row_desc);
-        _lhs_slot_ctx->open(nullptr);
-        _rhs_slot_ctx->open(nullptr);
-        SortExecExprs* sort_exprs = _obj_pool.add(new SortExecExprs());
-        sort_exprs->init(vector<ExprContext*>(1, _lhs_slot_ctx),
-                         std::vector<ExprContext*>(1, _rhs_slot_ctx));
-        _less_than = _obj_pool.add(new TupleRowComparator(*sort_exprs, 
std::vector<bool>(1, true),
-                                                          std::vector<bool>(1, 
false)));
-    }
-
-    // Create _batch, but don't fill it with data yet. Assumes we created 
_row_desc.
-    RowBatch* create_row_batch() {
-        RowBatch* batch = new RowBatch(*_row_desc, BATCH_CAPACITY);
-        int64_t* tuple_mem =
-                
reinterpret_cast<int64_t*>(batch->tuple_data_pool()->allocate(BATCH_CAPACITY * 
8));
-        bzero(tuple_mem, BATCH_CAPACITY * 8);
-
-        for (int i = 0; i < BATCH_CAPACITY; ++i) {
-            int idx = batch->add_row();
-            TupleRow* row = batch->get_row(idx);
-            row->set_tuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
-            batch->commit_last_row();
-        }
-
-        return batch;
-    }
-
-    void get_next_batch(RowBatch* batch, int* next_val) {
-        LOG(INFO) << "batch_capacity=" << BATCH_CAPACITY << " next_val=" << 
*next_val;
-        for (int i = 0; i < BATCH_CAPACITY; ++i) {
-            TupleRow* row = batch->get_row(i);
-            int64_t* val = 
reinterpret_cast<int64_t*>(row->get_tuple(0)->get_slot(0));
-            *val = (*next_val)++;
-        }
-    }
-
-    // Start receiver (expecting given number of senders) in separate thread.
-    void start_receiver(TPartitionType::type stream_type, int num_senders, int 
receiver_num,
-                        int buffer_size, bool is_merging, TUniqueId* out_id = 
nullptr) {
-        VLOG_QUERY << "start receiver";
-        RuntimeProfile* profile = _obj_pool.add(new 
RuntimeProfile("TestReceiver"));
-        TUniqueId instance_id;
-        get_next_instance_id(&instance_id);
-        _receiver_info.push_back(ReceiverInfo(stream_type, num_senders, 
receiver_num));
-        ReceiverInfo& info = _receiver_info.back();
-        info.stream_recvr =
-                _stream_mgr->create_recvr(&_runtime_state, *_row_desc, 
instance_id, DEST_NODE_ID,
-                                          num_senders, buffer_size, profile, 
is_merging);
-        if (!is_merging) {
-            info.thread_handle = new thread(&DataStreamTest::read_stream, 
this, &info);
-        } else {
-            info.thread_handle =
-                    new thread(&DataStreamTest::read_stream_merging, this, 
&info, profile);
-        }
-
-        if (out_id != nullptr) {
-            *out_id = instance_id;
-        }
-    }
-
-    void join_receivers() {
-        VLOG_QUERY << "join receiver\n";
-
-        for (int i = 0; i < _receiver_info.size(); ++i) {
-            _receiver_info[i].thread_handle->join();
-            _receiver_info[i].stream_recvr->close();
-        }
-    }
-
-    // Deplete stream and print batches
-    void read_stream(ReceiverInfo* info) {
-        RowBatch* batch = nullptr;
-        VLOG_QUERY << "start reading";
-
-        while (!(info->status = 
info->stream_recvr->get_batch(&batch)).is_cancelled() &&
-               (batch != nullptr)) {
-            VLOG_QUERY << "read batch #rows=" << (batch != nullptr ? 
batch->num_rows() : 0);
-
-            for (int i = 0; i < batch->num_rows(); ++i) {
-                TupleRow* row = batch->get_row(i);
-                
info->data_values.insert(*static_cast<int64_t*>(row->get_tuple(0)->get_slot(0)));
-            }
-
-            SleepFor(MonoDelta::FromMilliseconds(
-                    10)); // slow down receiver to exercise buffering logic
-        }
-
-        if (info->status.is_cancelled()) {
-            VLOG_QUERY << "reader is cancelled";
-        }
-
-        VLOG_QUERY << "done reading";
-    }
-
-    void read_stream_merging(ReceiverInfo* info, RuntimeProfile* profile) {
-        info->status = info->stream_recvr->create_merger(*_less_than);
-        if (info->status.is_cancelled()) {
-            return;
-        }
-        RowBatch batch(*_row_desc, 1024);
-        VLOG_QUERY << "start reading merging";
-        bool eos = false;
-        while (!(info->status = info->stream_recvr->get_next(&batch, 
&eos)).is_cancelled()) {
-            VLOG_QUERY << "read batch #rows=" << batch.num_rows();
-            for (int i = 0; i < batch.num_rows(); ++i) {
-                TupleRow* row = batch.get_row(i);
-                
info->data_values.insert(*static_cast<int64_t*>(row->get_tuple(0)->get_slot(0)));
-            }
-            SleepFor(MonoDelta::FromMilliseconds(
-                    10)); // slow down receiver to exercise buffering logic
-            batch.reset();
-            if (eos) {
-                break;
-            }
-        }
-        if (info->status.is_cancelled()) {
-            VLOG_QUERY << "reader is cancelled";
-        }
-        VLOG_QUERY << "done reading";
-    }
-
-    // Verify correctness of receivers' data values.
-    void check_receivers(TPartitionType::type stream_type, int num_senders) {
-        int64_t total = 0;
-        multiset<int64_t> all_data_values;
-
-        for (int i = 0; i < _receiver_info.size(); ++i) {
-            ReceiverInfo& info = _receiver_info[i];
-            EXPECT_TRUE(info.status.ok());
-            total += info.data_values.size();
-            DCHECK_EQ(info.stream_type, stream_type);
-            DCHECK_EQ(info.num_senders, num_senders);
-
-            if (stream_type == TPartitionType::UNPARTITIONED) {
-                EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, 
info.data_values.size());
-            }
-
-            all_data_values.insert(info.data_values.begin(), 
info.data_values.end());
-
-            int k = 0;
-            for (multiset<int64_t>::iterator j = info.data_values.begin();
-                 j != info.data_values.end(); ++j, ++k) {
-                if (stream_type == TPartitionType::UNPARTITIONED) {
-                    // unpartitioned streams contain all values as many times 
as there are
-                    // senders
-                    EXPECT_EQ(k / num_senders, *j);
-                } else if (stream_type == TPartitionType::HASH_PARTITIONED) {
-                    // hash-partitioned streams send values to the right 
partition
-                    int64_t value = *j;
-                    uint32_t hash_val = RawValue::get_hash_value_fvn(&value, 
TYPE_BIGINT, 0U);
-                    EXPECT_EQ(hash_val % _receiver_info.size(), 
info.receiver_num);
-                }
-            }
-        }
-
-        if (stream_type == TPartitionType::HASH_PARTITIONED) {
-            EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
-
-            int k = 0;
-            for (multiset<int64_t>::iterator j = all_data_values.begin();
-                 j != all_data_values.end(); ++j, ++k) {
-                // each sender sent all values
-                EXPECT_EQ(k / num_senders, *j);
-
-                if (k / num_senders != *j) {
-                    break;
-                }
-            }
-        }
-    }
-
-    void check_senders() {
-        for (int i = 0; i < _sender_info.size(); ++i) {
-            EXPECT_TRUE(_sender_info[i].status.ok());
-            EXPECT_GT(_sender_info[i].num_bytes_sent, 0) << "info  i=" << i;
-        }
-    }
-
-    // Start backend in separate thread.
-    void start_backend() {
-        std::shared_ptr<DorisTestBackend> handler(new 
DorisTestBackend(_stream_mgr));
-        std::shared_ptr<apache::thrift::TProcessor> processor(new 
BackendServiceProcessor(handler));
-        _server = new ThriftServer("DataStreamTest backend", processor, 
config::port, nullptr);
-        _server->start();
-    }
-
-    void stop_backend() {
-        VLOG_QUERY << "stop backend\n";
-        _server->stop_for_testing();
-        delete _server;
-    }
-
-    void start_sender(TPartitionType::type partition_type = 
TPartitionType::UNPARTITIONED,
-                      int channel_buffer_size = 1024) {
-        VLOG_QUERY << "start sender";
-        int sender_id = _sender_info.size();
-        DCHECK_LT(sender_id, MAX_SENDERS);
-        _sender_info.push_back(SenderInfo());
-        SenderInfo& info = _sender_info.back();
-        info.thread_handle = new thread(&DataStreamTest::sender, this, 
sender_id,
-                                        channel_buffer_size, partition_type);
-    }
-
-    void join_senders() {
-        VLOG_QUERY << "join senders\n";
-        for (int i = 0; i < _sender_info.size(); ++i) {
-            _sender_info[i].thread_handle->join();
-        }
-    }
-
-    void sender(int sender_num, int channel_buffer_size, TPartitionType::type 
partition_type) {
-        RuntimeState state(TExecPlanFragmentParams(), TQueryOptions(), "", 
&_exec_env);
-        state.set_desc_tbl(_desc_tbl);
-        state.init_mem_trackers(TUniqueId());
-        VLOG_QUERY << "create sender " << sender_num;
-        const TDataStreamSink& stream_sink =
-                (partition_type == TPartitionType::UNPARTITIONED ? 
_broadcast_sink : _hash_sink);
-        DataStreamSender sender(&_obj_pool, sender_num, *_row_desc, 
stream_sink, _dest,
-                                channel_buffer_size);
-
-        TDataSink data_sink;
-        data_sink.__set_type(TDataSinkType::DATA_STREAM_SINK);
-        data_sink.__set_stream_sink(stream_sink);
-        EXPECT_TRUE(sender.init(data_sink).ok());
-
-        EXPECT_TRUE(sender.prepare(&state).ok());
-        EXPECT_TRUE(sender.open(&state).ok());
-        std::unique_ptr<RowBatch> batch(create_row_batch());
-        SenderInfo& info = _sender_info[sender_num];
-        int next_val = 0;
-
-        for (int i = 0; i < NUM_BATCHES; ++i) {
-            get_next_batch(batch.get(), &next_val);
-            VLOG_QUERY << "sender " << sender_num << ": #rows=" << 
batch->num_rows();
-            info.status = sender.send(&state, batch.get());
-
-            if (!info.status.ok()) {
-                LOG(WARNING) << "something is wrong when sending: " << 
info.status;
-                break;
-            }
-        }
-
-        VLOG_QUERY << "closing sender" << sender_num;
-        info.status = sender.close(&state, Status::OK());
-        info.num_bytes_sent = sender.get_num_data_bytes_sent();
-
-        batch->reset();
-    }
-
-    void test_stream(TPartitionType::type stream_type, int num_senders, int 
num_receivers,
-                     int buffer_size, bool is_merging) {
-        LOG(INFO) << "Testing stream=" << stream_type << " #senders=" << 
num_senders
-                  << " #receivers=" << num_receivers << " buffer_size=" << 
buffer_size;
-        reset();
-
-        for (int i = 0; i < num_receivers; ++i) {
-            start_receiver(stream_type, num_senders, i, buffer_size, 
is_merging);
-        }
-
-        for (int i = 0; i < num_senders; ++i) {
-            start_sender(stream_type, buffer_size);
-        }
-
-        join_senders();
-        check_senders();
-        join_receivers();
-        check_receivers(stream_type, num_senders);
-    }
-
-private:
-    ExprContext* _lhs_slot_ctx;
-    ExprContext* _rhs_slot_ctx;
-};
-
-TEST_F(DataStreamTest, UnknownSenderSmallResult) {
-    // starting a sender w/o a corresponding receiver does not result in an 
error because
-    // we cannot distinguish whether a receiver was never created or the 
receiver
-    // willingly tore down the stream
-    // case 1: entire query result fits in single buffer, close() returns ok
-    TUniqueId dummy_id;
-    get_next_instance_id(&dummy_id);
-    start_sender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
-    join_senders();
-    EXPECT_TRUE(_sender_info[0].status.ok());
-    EXPECT_GT(_sender_info[0].num_bytes_sent, 0);
-}
-
-TEST_F(DataStreamTest, UnknownSenderLargeResult) {
-    // case 2: query result requires multiple buffers, send() returns ok
-    TUniqueId dummy_id;
-    get_next_instance_id(&dummy_id);
-    start_sender();
-    join_senders();
-    EXPECT_TRUE(_sender_info[0].status.ok());
-    EXPECT_GT(_sender_info[0].num_bytes_sent, 0);
-}
-
-TEST_F(DataStreamTest, Cancel) {
-    TUniqueId instance_id;
-    start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, 
&instance_id);
-    _stream_mgr->cancel(instance_id);
-    start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, 
&instance_id);
-    _stream_mgr->cancel(instance_id);
-    join_receivers();
-    EXPECT_TRUE(_receiver_info[0].status.is_cancelled());
-}
-
-TEST_F(DataStreamTest, BasicTest) {
-    // TODO: also test that all client connections have been returned
-    TPartitionType::type stream_types[] = {TPartitionType::UNPARTITIONED,
-                                           TPartitionType::HASH_PARTITIONED};
-    int sender_nums[] = {1, 3};
-    int receiver_nums[] = {1, 3};
-    int buffer_sizes[] = {1024, 1024 * 1024};
-    bool merging[] = {false, true};
-
-    // test_stream(TPartitionType::HASH_PARTITIONED, 1, 3, 1024, true);
-    for (int i = 0; i < sizeof(stream_types) / sizeof(*stream_types); ++i) {
-        for (int j = 0; j < sizeof(sender_nums) / sizeof(int); ++j) {
-            for (int k = 0; k < sizeof(receiver_nums) / sizeof(int); ++k) {
-                for (int l = 0; l < sizeof(buffer_sizes) / sizeof(int); ++l) {
-                    for (int m = 0; m < sizeof(merging) / sizeof(bool); ++m) {
-                        LOG(ERROR) << "before test: stream_type=" << 
stream_types[i]
-                                   << "  sender num=" << sender_nums[j]
-                                   << "  receiver_num=" << receiver_nums[k]
-                                   << "  buffer_size=" << buffer_sizes[l]
-                                   << "  merging=" << (merging[m] ? "true" : 
"false");
-                        test_stream(stream_types[i], sender_nums[j], 
receiver_nums[k],
-                                    buffer_sizes[l], merging[m]);
-                        LOG(ERROR) << "after test: stream_type=" << 
stream_types[i]
-                                   << "  sender num=" << sender_nums[j]
-                                   << "  receiver_num=" << receiver_nums[k]
-                                   << "  buffer_size=" << buffer_sizes[l]
-                                   << "  merging=" << (merging[m] ? "true" : 
"false");
-                    }
-                }
-            }
-        }
-    }
-}
-
-// TODO: more tests:
-// - test case for transmission error in last batch
-// - receivers getting created concurrently
-
-} // namespace doris
diff --git a/be/test/runtime/load_channel_mgr_test.cpp 
b/be/test/runtime/load_channel_mgr_test.cpp
index 3569b30757..d99951d957 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -60,23 +60,6 @@ Status DeltaWriter::open(WriteRequest* req, DeltaWriter** 
writer) {
     return open_status;
 }
 
-Status DeltaWriter::write(Tuple* tuple) {
-    if (_k_tablet_recorder.find(_req.tablet_id) == 
std::end(_k_tablet_recorder)) {
-        _k_tablet_recorder[_req.tablet_id] = 1;
-    } else {
-        _k_tablet_recorder[_req.tablet_id]++;
-    }
-    return add_status;
-}
-
-Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& 
row_idxs) {
-    if (_k_tablet_recorder.find(_req.tablet_id) == 
std::end(_k_tablet_recorder)) {
-        _k_tablet_recorder[_req.tablet_id] = 0;
-    }
-    _k_tablet_recorder[_req.tablet_id] += row_idxs.size();
-    return add_status;
-}
-
 Status DeltaWriter::close() {
     return Status::OK();
 }
@@ -177,94 +160,6 @@ void create_schema(DescriptorTbl* desc_tbl, 
POlapTableSchemaParam* pschema) {
     indexes->set_schema_hash(123);
 }
 
-TEST_F(LoadChannelMgrTest, normal) {
-    ExecEnv env;
-    LoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBatchRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        RowBatch row_batch(row_desc, 1024);
-
-        // row1
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
987654;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 1234567899876;
-            row_batch.commit_last_row();
-        }
-        // row2
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
12345678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 9876567899876;
-            row_batch.commit_last_row();
-        }
-        // row3
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
876545678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 76543234567;
-            row_batch.commit_last_row();
-        }
-        row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, 
&compressed_size);
-        PTabletWriterAddBatchResult response;
-        auto st = mgr.add_batch(request, &response);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-    // check content
-    EXPECT_EQ(_k_tablet_recorder[20], 2);
-    EXPECT_EQ(_k_tablet_recorder[21], 1);
-}
-
 TEST_F(LoadChannelMgrTest, cancel) {
     ExecEnv env;
     LoadChannelMgr mgr;
@@ -342,373 +237,4 @@ TEST_F(LoadChannelMgrTest, open_failed) {
     }
 }
 
-TEST_F(LoadChannelMgrTest, add_failed) {
-    ExecEnv env;
-    LoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBatchRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        RowBatch row_batch(row_desc, 1024);
-
-        // row1
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
987654;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 1234567899876;
-            row_batch.commit_last_row();
-        }
-        // row2
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
12345678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 9876567899876;
-            row_batch.commit_last_row();
-        }
-        // row3
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
876545678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 76543234567;
-            row_batch.commit_last_row();
-        }
-        row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, 
&compressed_size);
-        // DeltaWriter's write will return -215
-        add_status = Status::Error<TABLE_NOT_FOUND>();
-        PTabletWriterAddBatchResult response;
-        auto st = mgr.add_batch(request, &response);
-        request.release_id();
-        // st is still ok.
-        EXPECT_TRUE(st.ok());
-        EXPECT_EQ(2, response.tablet_errors().size());
-    }
-}
-
-TEST_F(LoadChannelMgrTest, close_failed) {
-    ExecEnv env;
-    LoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBatchRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        request.add_partition_ids(10);
-        request.add_partition_ids(11);
-
-        RowBatch row_batch(row_desc, 1024);
-
-        // row1
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
987654;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 1234567899876;
-            row_batch.commit_last_row();
-        }
-        // row2
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
12345678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 9876567899876;
-            row_batch.commit_last_row();
-        }
-        // row3
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
876545678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 76543234567;
-            row_batch.commit_last_row();
-        }
-        row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, 
&compressed_size);
-        close_status = Status::Error<TABLE_NOT_FOUND>();
-        PTabletWriterAddBatchResult response;
-        auto st = mgr.add_batch(request, &response);
-        request.release_id();
-        // even if delta close failed, the return status is still ok, but 
tablet_vec is empty
-        EXPECT_TRUE(st.ok());
-        EXPECT_TRUE(response.tablet_vec().empty());
-    }
-}
-
-TEST_F(LoadChannelMgrTest, unknown_tablet) {
-    ExecEnv env;
-    LoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBatchRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(22);
-        request.add_tablet_ids(20);
-
-        RowBatch row_batch(row_desc, 1024);
-
-        // row1
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
987654;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 1234567899876;
-            row_batch.commit_last_row();
-        }
-        // row2
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
12345678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 9876567899876;
-            row_batch.commit_last_row();
-        }
-        // row3
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
876545678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 76543234567;
-            row_batch.commit_last_row();
-        }
-        row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, 
&compressed_size);
-        PTabletWriterAddBatchResult response;
-        auto st = mgr.add_batch(request, &response);
-        request.release_id();
-        EXPECT_FALSE(st.ok());
-    }
-}
-
-TEST_F(LoadChannelMgrTest, duplicate_packet) {
-    ExecEnv env;
-    LoadChannelMgr mgr;
-    mgr.init(-1);
-
-    auto tdesc_tbl = create_descriptor_table();
-    ObjectPool obj_pool;
-    DescriptorTbl* desc_tbl = nullptr;
-    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
-    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
-    RowDescriptor row_desc(*desc_tbl, {0}, {false});
-    PUniqueId load_id;
-    load_id.set_hi(2);
-    load_id.set_lo(3);
-    {
-        PTabletWriterOpenRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_txn_id(1);
-        create_schema(desc_tbl, request.mutable_schema());
-        for (int i = 0; i < 2; ++i) {
-            auto tablet = request.add_tablets();
-            tablet->set_partition_id(10 + i);
-            tablet->set_tablet_id(20 + i);
-        }
-        request.set_num_senders(1);
-        request.set_need_gen_rollup(false);
-        auto st = mgr.open(request);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-
-    // add a batch
-    {
-        PTabletWriterAddBatchRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(false);
-        request.set_packet_seq(0);
-
-        request.add_tablet_ids(20);
-        request.add_tablet_ids(21);
-        request.add_tablet_ids(20);
-
-        RowBatch row_batch(row_desc, 1024);
-
-        // row1
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
987654;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 1234567899876;
-            row_batch.commit_last_row();
-        }
-        // row2
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
12345678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 9876567899876;
-            row_batch.commit_last_row();
-        }
-        // row3
-        {
-            auto id = row_batch.add_row();
-            auto tuple = 
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
-            row_batch.get_row(id)->set_tuple(0, tuple);
-            memset(tuple, 0, tuple_desc->byte_size());
-            *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) = 
876545678;
-            *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset()) 
= 76543234567;
-            row_batch.commit_last_row();
-        }
-        row_batch.serialize(request.mutable_row_batch(), &uncompressed_size, 
&compressed_size);
-        PTabletWriterAddBatchResult response;
-        auto st = mgr.add_batch(request, &response);
-        EXPECT_TRUE(st.ok());
-        PTabletWriterAddBatchResult response2;
-        st = mgr.add_batch(request, &response2);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-    // close
-    {
-        PTabletWriterAddBatchRequest request;
-        request.set_allocated_id(&load_id);
-        request.set_index_id(4);
-        request.set_sender_id(0);
-        request.set_eos(true);
-        request.set_packet_seq(0);
-        PTabletWriterAddBatchResult response;
-        auto st = mgr.add_batch(request, &response);
-        request.release_id();
-        EXPECT_TRUE(st.ok());
-    }
-    // check content
-    EXPECT_EQ(_k_tablet_recorder[20], 2);
-    EXPECT_EQ(_k_tablet_recorder[21], 1);
-}
-
 } // namespace doris
diff --git a/be/test/runtime/result_sink_test.cpp 
b/be/test/runtime/result_sink_test.cpp
deleted file mode 100644
index 05a3caee03..0000000000
--- a/be/test/runtime/result_sink_test.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/result_sink.h"
-
-#include <gtest/gtest.h>
-#include <stdio.h>
-#include <stdlib.h>
-
-#include <iostream>
-
-#include "exprs/bool_literal.h"
-#include "exprs/expr.h"
-#include "exprs/float_literal.h"
-#include "exprs/int_literal.h"
-#include "exprs/string_literal.h"
-#include "exprs/timestamp_literal.h"
-#include "gen_cpp/Exprs_types.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/buffer_control_block.h"
-#include "runtime/primitive_type.h"
-#include "runtime/result_buffer_mgr.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple_row.h"
-#include "util/cpu_info.h"
-#include "util/mysql_row_buffer.h"
-
-namespace doris {
-
-class ResultSinkTest : public testing::Test {
-public:
-    ResultSinkTest() {
-        _runtime_state = new RuntimeState("ResultWriterTest");
-        _runtime_state->_exec_env = &_exec_env;
-
-        {
-            TExpr expr;
-            {
-                TExprNode node;
-
-                node.node_type = TExprNodeType::INT_LITERAL;
-                node.type = to_tcolumn_type_thrift(TPrimitiveType::TINYINT);
-                node.num_children = 0;
-                TIntLiteral data;
-                data.value = 1;
-                node.__set_int_literal(data);
-                expr.nodes.push_back(node);
-            }
-            _exprs.push_back(expr);
-        }
-    }
-    virtual ~ResultSinkTest() { delete _runtime_state; }
-
-protected:
-    virtual void SetUp() {}
-
-private:
-    ExecEnv _exec_env;
-    std::vector<TExpr> _exprs;
-    RuntimeState* _runtime_state;
-    RowDescriptor _row_desc;
-    TResultSink _tsink;
-};
-
-TEST_F(ResultSinkTest, init_normal) {
-    ResultSink sink(_row_desc, _exprs, _tsink, 1024);
-    EXPECT_TRUE(sink.init(_runtime_state).ok());
-    RowBatch row_batch(_row_desc, 1024);
-    row_batch.add_row();
-    row_batch.commit_last_row();
-    EXPECT_TRUE(sink.send(_runtime_state, &row_batch).ok());
-    EXPECT_TRUE(sink.close(_runtime_state, Status::OK()).ok());
-}
-
-} // namespace doris
-
-/* vim: set ts=4 sw=4 sts=4 tw=100 */
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index a987f30150..ef7ac73ce7 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -289,31 +289,6 @@ public:
         status.to_protobuf(response->mutable_status());
     }
 
-    void tablet_writer_add_batch(google::protobuf::RpcController* controller,
-                                 const PTabletWriterAddBatchRequest* request,
-                                 PTabletWriterAddBatchResult* response,
-                                 google::protobuf::Closure* done) override {
-        brpc::ClosureGuard done_guard(done);
-        {
-            std::lock_guard<std::mutex> l(_lock);
-            _row_counters += request->tablet_ids_size();
-            if (request->eos()) {
-                _eof_counters++;
-            }
-            k_add_batch_status.to_protobuf(response->mutable_status());
-
-            if (request->has_row_batch() && _row_desc != nullptr) {
-                brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
-                
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, 
cntl);
-                RowBatch batch(*_row_desc, request->row_batch());
-                for (int i = 0; i < batch.num_rows(); ++i) {
-                    LOG(INFO) << batch.get_row(i)->to_string(*_row_desc);
-                    
_output_set->emplace(batch.get_row(i)->to_string(*_row_desc));
-                }
-            }
-        }
-    }
-
     void tablet_writer_add_block(google::protobuf::RpcController* controller,
                                  const PTabletWriterAddBlockRequest* request,
                                  PTabletWriterAddBlockResult* response,
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index e49503ca04..6a47156b39 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -370,12 +370,6 @@ public class MockedBackendFactory {
             responseObserver.onCompleted();
         }
 
-        @Override
-        public void 
tabletWriterAddBatch(InternalService.PTabletWriterAddBatchRequest request, 
StreamObserver<InternalService.PTabletWriterAddBatchResult> responseObserver) {
-            responseObserver.onNext(null);
-            responseObserver.onCompleted();
-        }
-
         @Override
         public void 
tabletWriterCancel(InternalService.PTabletWriterCancelRequest request, 
StreamObserver<InternalService.PTabletWriterCancelResult> responseObserver) {
             responseObserver.onNext(null);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 552786f313..50bdb1bb68 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -551,8 +551,6 @@ service PBackendService {
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns 
(PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
-    rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns 
(PTabletWriterAddBatchResult);
-    rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns 
(PTabletWriterAddBatchResult);
     rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_add_block_by_http(PEmptyRequest) returns 
(PTabletWriterAddBlockResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns 
(PTabletWriterCancelResult);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to