This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ef984a6  [improvement](load) Improve load fault tolerance (#7674)
ef984a6 is described below

commit ef984a6a7209f2f4fe74a29a2946f633eb02f7fd
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Jan 20 09:23:21 2022 +0800

    [improvement](load) Improve load fault tolerance (#7674)
    
    Currently, if we encounter a problem with a replica of a tablet during the 
load process,
    such as a write error, rpc error, -235, etc., it will cause the entire load 
job to fail,
    which results in a significant reduction in Doris' fault tolerance.
    
    This PR mainly changes:
    
    1. refined the judgment of failed replicas in the load process, so that the 
failure of a few replicas will not affect the normal completion of the load job.
    2. fix a bug introduced from #7754 that may cause BE coredump
---
 be/src/exec/broker_scanner.cpp                     |   2 +-
 be/src/exec/tablet_sink.cpp                        | 148 +++++++++++++--------
 be/src/exec/tablet_sink.h                          |  38 +++---
 be/src/olap/delta_writer.cpp                       |  18 +--
 be/src/olap/delta_writer.h                         |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |   7 +
 be/src/runtime/load_channel.cpp                    |  15 +--
 be/src/runtime/load_channel.h                      |   5 +-
 be/src/runtime/load_channel_mgr.cpp                |   7 +-
 be/src/runtime/load_channel_mgr.h                  |   2 +-
 be/src/runtime/runtime_state.h                     |   7 +
 be/src/runtime/tablets_channel.cpp                 |  83 +++++++-----
 be/src/runtime/tablets_channel.h                   |  10 +-
 be/src/service/internal_service.cpp                |   3 +-
 be/src/vec/sink/vtablet_sink.cpp                   |   2 +-
 be/test/olap/delta_writer_test.cpp                 |   6 +-
 be/test/runtime/load_channel_mgr_test.cpp          |  38 +++---
 .../org/apache/doris/common/proc/LoadProcDir.java  |   8 +-
 .../org/apache/doris/ldap/LdapAuthenticate.java    |   4 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   4 +-
 .../load/loadv2/BrokerLoadingTaskAttachment.java   |  10 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  15 +++
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  18 +--
 .../load/routineload/KafkaRoutineLoadJob.java      |   2 +-
 .../java/org/apache/doris/mysql/MysqlProto.java    |   4 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |   4 +
 .../java/org/apache/doris/qe/ConnectProcessor.java |  13 --
 .../main/java/org/apache/doris/qe/Coordinator.java |  53 +++++---
 .../apache/doris/transaction/ErrorTabletInfo.java  |  50 +++++++
 gensrc/proto/internal_service.proto                |   6 +
 gensrc/script/gen_build_version.sh                 |   2 +-
 gensrc/thrift/FrontendService.thrift               |   2 +
 gensrc/thrift/Types.thrift                         |   5 +
 33 files changed, 386 insertions(+), 209 deletions(-)

diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index ab96e6a..af44b8f 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -340,7 +340,7 @@ void BrokerScanner::split_line(const Slice& line) {
             _split_values.emplace_back(buf, len);
         }
         delete row;
-        delete ptr;
+        delete[] ptr;
     } else {
         const char* value = line.data;
         size_t start = 0;  // point to the start pos of next col value.
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 41f3570..eb018a6 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -42,9 +42,9 @@
 namespace doris {
 namespace stream_load {
 
-NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t 
node_id,
+NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, 
int64_t node_id,
                          int32_t schema_hash)
-        : _parent(parent), _index_id(index_id), _node_id(node_id), 
_schema_hash(schema_hash) {}
+        : _parent(parent), _index_channel(index_channel), _node_id(node_id), 
_schema_hash(schema_hash) {}
 
 NodeChannel::~NodeChannel() {
     if (_open_closure != nullptr) {
@@ -90,7 +90,7 @@ Status NodeChannel::init(RuntimeState* state) {
 
     // Initialize _cur_add_batch_request
     _cur_add_batch_request.set_allocated_id(&_parent->_load_id);
-    _cur_add_batch_request.set_index_id(_index_id);
+    _cur_add_batch_request.set_index_id(_index_channel->_index_id);
     _cur_add_batch_request.set_sender_id(_parent->_sender_id);
     _cur_add_batch_request.set_backend_id(_node_id);
     _cur_add_batch_request.set_eos(false);
@@ -100,14 +100,14 @@ Status NodeChannel::init(RuntimeState* state) {
 
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
-    _name = fmt::format("NodeChannel[{}-{}]", _index_id, _node_id);
+    _name = fmt::format("NodeChannel[{}-{}]", _index_channel->_index_id, 
_node_id);
     return Status::OK();
 }
 
 void NodeChannel::open() {
     PTabletWriterOpenRequest request;
     request.set_allocated_id(&_parent->_load_id);
-    request.set_index_id(_index_id);
+    request.set_index_id(_index_channel->_index_id);
     request.set_txn_id(_parent->_txn_id);
     request.set_allocated_schema(_parent->_schema->to_protobuf());
     for (auto& tablet : _all_tablets) {
@@ -176,15 +176,27 @@ Status NodeChannel::open_wait() {
     // add batch closure
     _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
     _add_batch_closure->addFailedHandler([this]() {
-        _cancel_with_msg(
-                fmt::format("{}, err: {}", channel_info(), 
_add_batch_closure->cntl.ErrorText()));
+        // If rpc failed, mark all tablets on this node channel as failed
+        _index_channel->mark_as_failed(this, 
_add_batch_closure->cntl.ErrorText(), -1);
+        Status st = _index_channel->check_intolerable_failure();
+        if (!st.ok()) {
+            _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
+        }
     });
 
     _add_batch_closure->addSuccessHandler([this](const 
PTabletWriterAddBatchResult& result,
                                                  bool is_last_rpc) {
         Status status(result.status());
         if (status.ok()) {
-            if (is_last_rpc) {
+            // if has error tablet, handle them first 
+            for (auto& error : result.tablet_errors()) {
+                _index_channel->mark_as_failed(this, error.msg(), 
error.tablet_id());
+            }
+
+            Status st = _index_channel->check_intolerable_failure();
+            if (!st.ok()) {
+                _cancel_with_msg(st.get_error_msg());
+            } else if (is_last_rpc) {
                 for (auto& tablet : result.tablet_vec()) {
                     TTabletCommitInfo commit_info;
                     commit_info.tabletId = tablet.tablet_id();
@@ -254,6 +266,8 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t 
tablet_id) {
     return Status::OK();
 }
 
+// Used for vectorized engine.
+// TODO(cmy): deprecated, need refactor
 Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
     // If add_row() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
     auto st = none_of({_cancelled, _eos_is_produced});
@@ -319,6 +333,7 @@ Status NodeChannel::mark_close() {
         _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
         _pending_batches_num++;
         DCHECK(_pending_batches.back().second.eos());
+        LOG(INFO) << channel_info() << " mark closed, left pending batch size: 
" << _pending_batches.size();
     }
 
     _eos_is_produced = true;
@@ -356,6 +371,8 @@ Status NodeChannel::close_wait(RuntimeState* state) {
         state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
                                             
std::make_move_iterator(_tablet_commit_infos.begin()),
                                             
std::make_move_iterator(_tablet_commit_infos.end()));
+
+        _index_channel->set_error_tablet_in_state(state);    
         return Status::OK();
     }
 
@@ -377,7 +394,7 @@ void NodeChannel::cancel(const std::string& cancel_msg) {
 
     PTabletWriterCancelRequest request;
     request.set_allocated_id(&_parent->_load_id);
-    request.set_index_id(_index_id);
+    request.set_index_id(_index_channel->_index_id);
     request.set_sender_id(_parent->_sender_id);
 
     auto closure = new RefCountClosure<PTabletWriterCancelResult>();
@@ -511,7 +528,7 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
             auto it = _node_channels.find(node_id);
             if (it == _node_channels.end()) {
                 channel = _parent->_pool->add(
-                        new NodeChannel(_parent, _index_id, node_id, 
_schema_hash));
+                        new NodeChannel(_parent, this, node_id, _schema_hash));
                 _node_channels.emplace(node_id, channel);
             } else {
                 channel = it->second;
@@ -528,59 +545,74 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
     return Status::OK();
 }
 
-Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
+void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
     auto it = _channels_by_tablet.find(tablet_id);
     DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << 
tablet_id;
-    std::stringstream ss;
     for (auto channel : it->second) {
         // if this node channel is already failed, this add_row will be skipped
         auto st = channel->add_row(tuple, tablet_id);
         if (!st.ok()) {
-            mark_as_failed(channel);
-            ss << st.get_error_msg() << "; ";
+            mark_as_failed(channel, st.get_error_msg(), tablet_id);
+            // continue add row to other node, the error will be checked for 
every batch outside
         }
     }
-
-    if (has_intolerable_failure()) {
-        std::stringstream ss2;
-        ss2 << "index channel has intolerable failure. " << 
BackendOptions::get_localhost()
-            << ", err: " << ss.str();
-        return Status::InternalError(ss2.str());
-    }
-
-    return Status::OK();
 }
 
-Status IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+// Used for vectorized engine.
+// TODO(cmy): deprecated, need refactor
+void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
     auto it = _channels_by_tablet.find(tablet_id);
     DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << 
tablet_id;
-    std::stringstream ss;
     for (auto channel : it->second) {
         // if this node channel is already failed, this add_row will be skipped
         auto st = channel->add_row(block_row, tablet_id);
         if (!st.ok()) {
-            mark_as_failed(channel);
-            ss << st.get_error_msg() << "; ";
+            mark_as_failed(channel, st.get_error_msg(), tablet_id);
         }
     }
+}
 
-    if (has_intolerable_failure()) {
-        std::stringstream ss2;
-        ss2 << "index channel has intolerable failure. " << 
BackendOptions::get_localhost()
-            << ", err: " << ss.str();
-        return Status::InternalError(ss2.str());
+void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string& 
err, int64_t tablet_id) {
+    const auto& it = _tablets_by_channel.find(ch->node_id());
+    if (it == _tablets_by_channel.end()) {
+        return;
     }
 
-    return Status::OK();
+    {
+        std::lock_guard<SpinLock> l(_fail_lock); 
+        if (tablet_id == -1) {
+            for (const auto the_tablet_id : it->second) {
+                _failed_channels[the_tablet_id].insert(ch->node_id());
+                _failed_channels_msgs.emplace(the_tablet_id, err + ", host: " 
+ ch->host());
+                if (_failed_channels[the_tablet_id].size() >= 
((_parent->_num_replicas + 1) / 2)) {
+                    _intolerable_failure_status = 
Status::InternalError(_failed_channels_msgs[the_tablet_id]);
+                }
+            }
+        } else {
+            _failed_channels[tablet_id].insert(ch->node_id());
+            _failed_channels_msgs.emplace(tablet_id, err + ", host: " + 
ch->host());
+            if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas 
+ 1) / 2)) {
+                _intolerable_failure_status = 
Status::InternalError(_failed_channels_msgs[tablet_id]);
+            }
+        }
+    }
 }
 
-bool IndexChannel::has_intolerable_failure() {
-    for (const auto& it : _failed_channels) {
-        if (it.second.size() >= ((_parent->_num_replicas + 1) / 2)) {
-            return true;
-        }
+Status IndexChannel::check_intolerable_failure() {
+    std::lock_guard<SpinLock> l(_fail_lock); 
+    return _intolerable_failure_status;
+}
+
+void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
+    std::vector<TErrorTabletInfo>& error_tablet_infos = 
state->error_tablet_infos();
+
+    std::lock_guard<SpinLock> l(_fail_lock); 
+    for (const auto& it : _failed_channels_msgs) {
+        TErrorTabletInfo error_info;
+        error_info.__set_tabletId(it.first);
+        error_info.__set_msg(it.second);
+        error_tablet_infos.emplace_back(error_info);
     }
-    return false;
 }
 
 OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
@@ -757,22 +789,17 @@ Status OlapTableSink::open(RuntimeState* state) {
     }
 
     for (auto index_channel : _channels) {
-        std::stringstream ss;
-        index_channel->for_each_node_channel([&index_channel, 
&ss](NodeChannel* ch) {
+        index_channel->for_each_node_channel([&index_channel](NodeChannel* ch) 
{
             auto st = ch->open_wait();
             if (!st.ok()) {
-                std::stringstream err;
-                err << ch->channel_info() << ", tablet open failed, err: " << 
st.get_error_msg();
-                LOG(WARNING) << err.str();
-                ss << err.str() << "; ";
-                index_channel->mark_as_failed(ch);
+                // The open() phase is mainly to generate DeltaWriter 
instances on the nodes corresponding to each node channel.
+                // This phase will not fail due to a single tablet.
+                // Therefore, if the open() phase fails, all tablets 
corresponding to the node need to be marked as failed.
+                index_channel->mark_as_failed(ch, fmt::format("{}, open 
failed, err: {}", ch->channel_info(), st.get_error_msg()), -1);
             }
         });
 
-        if (index_channel->has_intolerable_failure()) {
-            LOG(WARNING) << "open failed, load_id=" << _load_id << ", err: " 
<< ss.str();
-            return Status::InternalError(ss.str());
-        }
+        RETURN_IF_ERROR(index_channel->check_intolerable_failure());
     }
     int32_t send_batch_parallelism =
             MIN(_send_batch_parallelism, 
config::max_send_batch_parallelism_per_job);
@@ -844,10 +871,15 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
         uint32_t tablet_index = dist_hash % partition->num_buckets;
         for (int j = 0; j < partition->indexes.size(); ++j) {
             int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
-            RETURN_IF_ERROR(_channels[j]->add_row(tuple, tablet_id));
+            _channels[j]->add_row(tuple, tablet_id);
             _number_output_rows++;
         }
     }
+
+    // check intolerable failure
+    for (auto index_channel : _channels) { 
+        RETURN_IF_ERROR(index_channel->check_intolerable_failure());
+    }
     return Status::OK();
 }
 
@@ -879,14 +911,13 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
             for (auto index_channel : _channels) {
                 int64_t add_batch_exec_time = 0;
                 index_channel->for_each_node_channel(
-                        [&status, &state, &node_add_batch_counter_map, 
&serialize_batch_ns,
+                        [&index_channel, &state, &node_add_batch_counter_map, 
&serialize_batch_ns,
                          &mem_exceeded_block_ns, &queue_push_lock_ns, 
&actual_consume_ns,
                          &total_add_batch_exec_time_ns, &add_batch_exec_time,
                          &total_add_batch_num](NodeChannel* ch) {
                             auto s = ch->close_wait(state);
                             if (!s.ok()) {
-                                // 'status' will store the last non-ok status 
of all channels
-                                status = s;
+                                index_channel->mark_as_failed(ch, 
s.get_error_msg(), -1);
                                 LOG(WARNING)
                                         << ch->channel_info()
                                         << ", close channel failed, err: " << 
s.get_error_msg();
@@ -900,7 +931,14 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
                 if (add_batch_exec_time > max_add_batch_exec_time_ns) {
                     max_add_batch_exec_time_ns = add_batch_exec_time;
                 }
-            }
+
+                // check if index has intolerable failure
+                Status index_st = index_channel->check_intolerable_failure();
+                if (!index_st.ok()) {
+                    status = index_st;
+                }
+            } // end for index channels
+            
         }
         // TODO need to be improved
         LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index df40493..77409e8e 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -144,9 +144,10 @@ private:
     std::function<void(const T&, bool)> success_handler;
 };
 
+class IndexChannel;
 class NodeChannel {
 public:
-    NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id, 
int32_t schema_hash);
+    NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t 
node_id, int32_t schema_hash);
     ~NodeChannel() noexcept;
 
     // called before open, used to add tablet located in this backend
@@ -195,6 +196,7 @@ public:
     }
 
     int64_t node_id() const { return _node_id; }
+    std::string host() const { return _node_info.host; }
     std::string name() const { return _name; }
 
     Status none_of(std::initializer_list<bool> vars);
@@ -203,10 +205,7 @@ public:
     void clear_all_batches();
 
     std::string channel_info() const {
-        // FIXME(cmy): There is a problem that when calling node_info, the 
node_info seems not initialized.
-        //             But I don't know why. so here I print node_info->id 
instead of node_info->host
-        //             to avoid BE crash. It needs further observation.
-        return fmt::format("{}, {}, node={}:{}", _name, _load_info, 
_node_info.id,
+        return fmt::format("{}, {}, node={}:{}", _name, _load_info, 
_node_info.host,
                            _node_info.brpc_port);
     }
 
@@ -215,7 +214,7 @@ private:
 
 private:
     OlapTableSink* _parent = nullptr;
-    int64_t _index_id = -1;
+    IndexChannel* _index_channel = nullptr;
     int64_t _node_id = -1;
     int32_t _schema_hash = 0;
     std::string _load_info;
@@ -276,9 +275,9 @@ public:
 
     Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets);
 
-    Status add_row(Tuple* tuple, int64_t tablet_id);
+    void add_row(Tuple* tuple, int64_t tablet_id);
 
-    Status add_row(BlockRow& block_row, int64_t tablet_id);
+    void add_row(BlockRow& block_row, int64_t tablet_id);
 
     void for_each_node_channel(const std::function<void(NodeChannel*)>& func) {
         for (auto& it : _node_channels) {
@@ -286,20 +285,17 @@ public:
         }
     }
 
-    void mark_as_failed(const NodeChannel* ch) {
-        const auto& it = _tablets_by_channel.find(ch->node_id());
-        if (it == _tablets_by_channel.end()) {
-            return;
-        }
-        for (const auto tablet_id : it->second) {
-            _failed_channels[tablet_id].insert(ch->node_id());
-        }
-    }
-    bool has_intolerable_failure();
+    void mark_as_failed(const NodeChannel* ch, const std::string& err, int64_t 
tablet_id = -1);
+    Status check_intolerable_failure();
+
+    // set error tablet info in runtime state, so that it can be returned to 
FE.
+    void set_error_tablet_in_state(RuntimeState* state);
 
     size_t num_node_channels() const { return _node_channels.size(); }
 
 private:
+    friend class NodeChannel;
+
     OlapTableSink* _parent;
     int64_t _index_id;
     int32_t _schema_hash;
@@ -310,8 +306,14 @@ private:
     std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
     // from backend channel to tablet_id
     std::unordered_map<int64_t, std::unordered_set<int64_t>> 
_tablets_by_channel;
+
+    // lock to protect _failed_channels and _failed_channels_msgs
+    mutable SpinLock _fail_lock;
     // key is tablet_id, value is a set of failed node id
     std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
+    // key is tablet_id, value is error message
+    std::unordered_map<int64_t, std::string> _failed_channels_msgs;
+    Status _intolerable_failure_status = Status::OK();
 };
 
 // Write data to Olap Table.
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index bc94c71..e970203 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -284,7 +284,7 @@ OLAPStatus DeltaWriter::close() {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec) {
+OLAPStatus 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec, bool is_broken) {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() 
being called";
@@ -333,13 +333,15 @@ OLAPStatus 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     }
 
 #ifndef BE_TEST
-    PTabletInfo* tablet_info = tablet_vec->Add();
-    tablet_info->set_tablet_id(_tablet->tablet_id());
-    tablet_info->set_schema_hash(_tablet->schema_hash());
-    if (_new_tablet != nullptr) {
-        tablet_info = tablet_vec->Add();
-        tablet_info->set_tablet_id(_new_tablet->tablet_id());
-        tablet_info->set_schema_hash(_new_tablet->schema_hash());
+    if (!is_broken) {
+        PTabletInfo* tablet_info = tablet_vec->Add();
+        tablet_info->set_tablet_id(_tablet->tablet_id());
+        tablet_info->set_schema_hash(_tablet->schema_hash());
+        if (_new_tablet != nullptr) {
+            tablet_info = tablet_vec->Add();
+            tablet_info->set_tablet_id(_new_tablet->tablet_id());
+            tablet_info->set_schema_hash(_new_tablet->schema_hash());
+        }
     }
 #endif
 
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 4b80b571c..b8db713 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -68,7 +68,7 @@ public:
     OLAPStatus close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec);
+    OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec, bool is_broken);
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -88,6 +88,8 @@ public:
     // Wait all memtable in flush queue to be flushed
     OLAPStatus wait_flush();
 
+    int64_t tablet_id() { return _tablet->tablet_id(); }
+
 private:
     DeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& parent,
                 StorageEngine* storage_engine);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d3c40e9..0f75ae7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -353,6 +353,13 @@ void FragmentExecState::coordinator_callback(const Status& 
status, RuntimeProfil
                 params.commitInfos.push_back(info);
             }
         }
+        if (!runtime_state->error_tablet_infos().empty()) {
+            params.__isset.errorTabletInfos = true;
+            
params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size());
+            for (auto& info : runtime_state->error_tablet_infos()) {
+                params.errorTabletInfos.push_back(info);
+            }
+        }
 
         // Send new errors to coordinator
         runtime_state->get_unreported_errors(&(params.error_log));
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 27352a0..ee33cc3 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -24,8 +24,10 @@
 namespace doris {
 
 LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t 
timeout_s,
-                         const std::shared_ptr<MemTracker>& mem_tracker, bool 
is_high_priority)
-        : _load_id(load_id), _timeout_s(timeout_s), 
_is_high_priority(is_high_priority) {
+                         const std::shared_ptr<MemTracker>& mem_tracker, bool 
is_high_priority,
+                         const std::string& sender_ip)
+        : _load_id(load_id), _timeout_s(timeout_s), 
_is_high_priority(is_high_priority),
+          _sender_ip(sender_ip) {
     _mem_tracker = MemTracker::CreateTracker(
             mem_limit, "LoadChannel:" + _load_id.to_string(), mem_tracker, 
true, false, MemTrackerLevel::TASK);
     // _last_updated_time should be set before being inserted to
@@ -42,9 +44,6 @@ LoadChannel::~LoadChannel() {
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
     int64_t index_id = params.index_id();
-    if (params.has_sender_ip()) {
-        _sender_ip = params.sender_ip();
-    }
     std::shared_ptr<TabletsChannel> channel;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -67,7 +66,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
 }
 
 Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
-                              google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec) {
+                              PTabletWriterAddBatchResult* response) {
     int64_t index_id = request.index_id();
     // 1. get tablets channel
     std::shared_ptr<TabletsChannel> channel;
@@ -91,7 +90,7 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBatchRequest& request,
 
     // 3. add batch to tablets channel
     if (request.has_row_batch()) {
-        RETURN_IF_ERROR(channel->add_batch(request));
+        RETURN_IF_ERROR(channel->add_batch(request, response));
     }
 
     // 4. handle eos
@@ -100,7 +99,7 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBatchRequest& request,
         bool finished = false;
         RETURN_IF_ERROR(channel->close(request.sender_id(), 
request.backend_id(), 
                                        &finished, request.partition_ids(),
-                                       tablet_vec));
+                                       response->mutable_tablet_vec()));
         if (finished) {
             std::lock_guard<std::mutex> l(_lock);
             _tablets_channels.erase(index_id);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 257dba8..13490f5 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,7 +39,8 @@ class TabletsChannel;
 class LoadChannel {
 public:
     LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
-                const std::shared_ptr<MemTracker>& mem_tracker, bool 
is_high_priority);
+                const std::shared_ptr<MemTracker>& mem_tracker, bool 
is_high_priority,
+                const std::string& sender_ip);
     ~LoadChannel();
 
     // open a new load channel if not exist
@@ -47,7 +48,7 @@ public:
 
     // this batch must belong to a index in one transaction
     Status add_batch(const PTabletWriterAddBatchRequest& request,
-                     google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec);
+                     PTabletWriterAddBatchResult* response);
 
     // return true if this load channel has been opened and all tablets 
channels are closed then.
     bool is_finished();
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 0b3367a..baa0e88 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -112,7 +112,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& 
params) {
             int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s);
 
             bool is_high_priority = (params.has_is_high_priority() && 
params.is_high_priority());
-            channel.reset(new LoadChannel(load_id, job_max_memory, 
job_timeout_s, _mem_tracker, is_high_priority));
+            channel.reset(new LoadChannel(load_id, job_max_memory, 
job_timeout_s, _mem_tracker, is_high_priority,
+                                          params.sender_ip()));
             _load_channels.insert({load_id, channel});
         }
     }
@@ -124,7 +125,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& 
params) {
 static void dummy_deleter(const CacheKey& key, void* value) {}
 
 Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
-                                 
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
+                                 PTabletWriterAddBatchResult* response) {
     UniqueId load_id(request.id());
     // 1. get load channel
     std::shared_ptr<LoadChannel> channel;
@@ -156,7 +157,7 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBatchRequest& request,
     // 3. add batch to load channel
     // batch may not exist in request(eg: eos request without batch),
     // this case will be handled in load channel's add batch method.
-    RETURN_IF_ERROR(channel->add_batch(request, tablet_vec));
+    RETURN_IF_ERROR(channel->add_batch(request, response));
 
     // 4. handle finish
     if (channel->is_finished()) {
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 450c8bf..1da0ec7 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -51,7 +51,7 @@ public:
     Status open(const PTabletWriterOpenRequest& request);
 
     Status add_batch(const PTabletWriterAddBatchRequest& request,
-                     google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec);
+                     PTabletWriterAddBatchResult* response);
 
     // cancel all tablet stream for 'load_id' load
     Status cancel(const PTabletWriterCancelRequest& request);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 8797076..449b4c2 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -368,6 +368,12 @@ public:
 
     std::vector<TTabletCommitInfo>& tablet_commit_infos() { return 
_tablet_commit_infos; }
 
+    const std::vector<TErrorTabletInfo>& error_tablet_infos() const {
+        return _error_tablet_infos;
+    }
+
+    std::vector<TErrorTabletInfo>& error_tablet_infos() { return 
_error_tablet_infos; }
+
     /// Helper to call QueryState::StartSpilling().
     Status StartSpilling(MemTracker* mem_tracker);
 
@@ -508,6 +514,7 @@ private:
     std::unique_ptr<LoadErrorHub> _error_hub;
     std::mutex _create_error_hub_lock;
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
+    std::vector<TErrorTabletInfo> _error_tablet_infos;
 
     //TODO chenhao , remove this to QueryState
     /// Pool of buffer reservations used to distribute initial reservations to 
operators
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 68370f6..75dc64f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -51,33 +51,34 @@ TabletsChannel::~TabletsChannel() {
     delete _schema;
 }
 
-Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
+Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kOpened) {
         // Normal case, already open by other sender
         return Status::OK();
     }
-    LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << 
params.tablets().size()
-              << ", timeout(s): " << params.load_channel_timeout_s();
-    _txn_id = params.txn_id();
-    _index_id = params.index_id();
+    LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << 
request.tablets().size()
+              << ", timeout(s): " << request.load_channel_timeout_s();
+    _txn_id = request.txn_id();
+    _index_id = request.index_id();
     _schema = new OlapTableSchemaParam();
-    RETURN_IF_ERROR(_schema->init(params.schema()));
+    RETURN_IF_ERROR(_schema->init(request.schema()));
     _tuple_desc = _schema->tuple_desc();
     _row_desc = new RowDescriptor(_tuple_desc, false);
 
-    _num_remaining_senders = params.num_senders();
+    _num_remaining_senders = request.num_senders();
     _next_seqs.resize(_num_remaining_senders, 0);
     _closed_senders.Reset(_num_remaining_senders);
 
-    RETURN_IF_ERROR(_open_all_writers(params));
+    RETURN_IF_ERROR(_open_all_writers(request));
 
     _state = kOpened;
     return Status::OK();
 }
 
-Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
-    DCHECK(params.tablet_ids_size() == params.row_batch().num_rows());
+Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
+        PTabletWriterAddBatchResult* response) {
+    DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
     int64_t cur_seq;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -87,23 +88,27 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBatchRequest& params) {
                 : Status::InternalError(strings::Substitute("TabletsChannel $0 
state: $1",
                             _key.to_string(), _state));
         }
-        cur_seq = _next_seqs[params.sender_id()];
+        cur_seq = _next_seqs[request.sender_id()];
         // check packet
-        if (params.packet_seq() < cur_seq) {
+        if (request.packet_seq() < cur_seq) {
             LOG(INFO) << "packet has already recept before, expect_seq=" << 
cur_seq
-                << ", recept_seq=" << params.packet_seq();
+                << ", recept_seq=" << request.packet_seq();
             return Status::OK();
-        } else if (params.packet_seq() > cur_seq) {
+        } else if (request.packet_seq() > cur_seq) {
             LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
-                << ", recept_seq=" << params.packet_seq();
+                << ", recept_seq=" << request.packet_seq();
             return Status::InternalError("lost data packet");
         }
     }
 
-    RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
+    RowBatch row_batch(*_row_desc, request.row_batch(), _mem_tracker.get());
     std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index 
*/> tablet_to_rowidxs;
-    for (int i = 0; i < params.tablet_ids_size(); ++i) {
-        int64_t tablet_id = params.tablet_ids(i);
+    for (int i = 0; i < request.tablet_ids_size(); ++i) {
+        int64_t tablet_id = request.tablet_ids(i);
+        if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
+            // skip broken tablets
+            continue;
+        }
         auto it = tablet_to_rowidxs.find(tablet_id);
         if (it == tablet_to_rowidxs.end()) {
             tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i 
});
@@ -112,6 +117,7 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBatchRequest& params) {
         }
     }
 
+    google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = 
response->mutable_tablet_errors(); 
     for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
         auto tablet_writer_it = 
_tablet_writers.find(tablet_to_rowidxs_it.first);
         if (tablet_writer_it == _tablet_writers.end()) {
@@ -125,13 +131,18 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBatchRequest& params) {
                     "tablet writer write failed, tablet_id=$0, txn_id=$1, 
err=$2",
                     tablet_to_rowidxs_it.first, _txn_id, st);
             LOG(WARNING) << err_msg;
-            return Status::InternalError(err_msg);
+            PTabletError* error = tablet_errors->Add();
+            error->set_tablet_id(tablet_to_rowidxs_it.first);
+            error->set_msg(err_msg);
+            _broken_tablets.insert(tablet_to_rowidxs_it.first);
+            // continue write to other tablet.
+            // the error will return back to sender.
         }
     }
 
     {
         std::lock_guard<std::mutex> l(_lock);
-        _next_seqs[params.sender_id()] = cur_seq + 1;
+        _next_seqs[request.sender_id()] = cur_seq + 1;
     }
     return Status::OK();
 }
@@ -186,7 +197,7 @@ Status TabletsChannel::close(int sender_id, int64_t 
backend_id, bool* finished,
         for (auto writer : need_wait_writers) {
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE 
judge it.
-            writer->close_wait(tablet_vec);
+            writer->close_wait(tablet_vec, 
(_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
         }
         // TODO(gaodayue) clear and destruct all delta writers to make sure 
all memory are freed
         // DCHECK_EQ(_mem_tracker->consumption(), 0);
@@ -249,7 +260,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     return Status::OK();
 }
 
-Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& 
params) {
+Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& 
request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
     for (auto& index : _schema->indexes()) {
@@ -264,21 +275,21 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& params)
         ss << "unknown index id, key=" << _key;
         return Status::InternalError(ss.str());
     }
-    for (auto& tablet : params.tablets()) {
-        WriteRequest request;
-        request.tablet_id = tablet.tablet_id();
-        request.schema_hash = schema_hash;
-        request.write_type = WriteType::LOAD;
-        request.txn_id = _txn_id;
-        request.partition_id = tablet.partition_id();
-        request.load_id = params.id();
-        request.need_gen_rollup = params.need_gen_rollup();
-        request.tuple_desc = _tuple_desc;
-        request.slots = index_slots;
-        request.is_high_priority = _is_high_priority;
+    for (auto& tablet : request.tablets()) {
+        WriteRequest wrequest;
+        wrequest.tablet_id = tablet.tablet_id();
+        wrequest.schema_hash = schema_hash;
+        wrequest.write_type = WriteType::LOAD;
+        wrequest.txn_id = _txn_id;
+        wrequest.partition_id = tablet.partition_id();
+        wrequest.load_id = request.id();
+        wrequest.need_gen_rollup = request.need_gen_rollup();
+        wrequest.tuple_desc = _tuple_desc;
+        wrequest.slots = index_slots;
+        wrequest.is_high_priority = _is_high_priority;
 
         DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&request, _mem_tracker, &writer);
+        auto st = DeltaWriter::open(&wrequest, _mem_tracker, &writer);
         if (st != OLAP_SUCCESS) {
             std::stringstream ss;
             ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
@@ -290,7 +301,7 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& params)
         _tablet_writers.emplace(tablet.tablet_id(), writer);
     }
     _s_tablet_writer_count += _tablet_writers.size();
-    DCHECK_EQ(_tablet_writers.size(), params.tablets_size());
+    DCHECK_EQ(_tablet_writers.size(), request.tablets_size());
     return Status::OK();
 }
 
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 11144cb..e99ac62 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -58,10 +58,10 @@ public:
 
     ~TabletsChannel();
 
-    Status open(const PTabletWriterOpenRequest& params);
+    Status open(const PTabletWriterOpenRequest& request);
 
     // no-op when this channel has been closed or cancelled
-    Status add_batch(const PTabletWriterAddBatchRequest& batch);
+    Status add_batch(const PTabletWriterAddBatchRequest& request, 
PTabletWriterAddBatchResult* response);
 
     // Mark sender with 'sender_id' as closed.
     // If all senders are closed, close this channel, set '*finished' to true, 
update 'tablet_vec'
@@ -84,7 +84,7 @@ public:
 
 private:
     // open all writer
-    Status _open_all_writers(const PTabletWriterOpenRequest& params);
+    Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
 private:
     // id of this load channel
@@ -118,6 +118,10 @@ private:
 
     // tablet_id -> TabletChannel
     std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
+    // broken tablet ids.
+    // If a tablet write fails, it's id will be added to this set.
+    // So that following batch will not handle this tablet anymore.
+    std::unordered_set<int64_t> _broken_tablets;
 
     std::unordered_set<int64_t> _partition_ids;
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 4106963..a948db8 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -137,8 +137,7 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
             SCOPED_RAW_TIMER(&execution_time_ns);
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, 
cntl);
-            auto st = _exec_env->load_channel_mgr()->add_batch(*request,
-                                                               
response->mutable_tablet_vec());
+            auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << 
st.get_error_msg()
                              << ", id=" << request->id() << ", index_id=" << 
request->index_id()
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 8b0f100..baf3b04 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -109,7 +109,7 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
         uint32_t tablet_index = dist_hash % partition->num_buckets;
         for (int j = 0; j < partition->indexes.size(); ++j) {
             int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
-            RETURN_IF_ERROR(_channels[j]->add_row(block_row, tablet_id));
+            _channels[j]->add_row(block_row, tablet_id);
             _number_output_rows++;
         }
     }
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index a0573b5..c4eb7f9 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) {
     ASSERT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     ASSERT_EQ(OLAP_SUCCESS, res);
-    res = delta_writer->close_wait(nullptr);
+    res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
     SAFE_DELETE(delta_writer);
 
@@ -472,7 +472,7 @@ TEST_F(TestDeltaWriter, write) {
 
     res = delta_writer->close();
     ASSERT_EQ(OLAP_SUCCESS, res);
-    res = delta_writer->close_wait(nullptr);
+    res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
 
     // publish version success
@@ -552,7 +552,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
 
     res = delta_writer->close();
     ASSERT_EQ(OLAP_SUCCESS, res);
-    res = delta_writer->close_wait(nullptr);
+    res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
 
     // publish version success
diff --git a/be/test/runtime/load_channel_mgr_test.cpp 
b/be/test/runtime/load_channel_mgr_test.cpp
index f8b2374..5664f45 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -85,7 +85,7 @@ OLAPStatus DeltaWriter::close() {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec) {
+OLAPStatus 
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec, bool is_broken) {
     return close_status;
 }
 
@@ -257,8 +257,9 @@ TEST_F(LoadChannelMgrTest, normal) {
             row_batch.commit_last_row();
         }
         row_batch.serialize(request.mutable_row_batch());
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
-        auto st = mgr.add_batch(request, &tablet_vec);
+        // google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
+        PTabletWriterAddBatchResult response;
+        auto st = mgr.add_batch(request, &response);
         request.release_id();
         ASSERT_TRUE(st.ok());
     }
@@ -423,11 +424,14 @@ TEST_F(LoadChannelMgrTest, add_failed) {
             row_batch.commit_last_row();
         }
         row_batch.serialize(request.mutable_row_batch());
+        // DeltaWriter's write will return -215
         add_status = OLAP_ERR_TABLE_NOT_FOUND;
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
-        auto st = mgr.add_batch(request, &tablet_vec);
+        PTabletWriterAddBatchResult response;
+        auto st = mgr.add_batch(request, &response);
         request.release_id();
-        ASSERT_FALSE(st.ok());
+        // st is still ok.
+        ASSERT_TRUE(st.ok());
+        ASSERT_EQ(2, response.tablet_errors().size());
     }
 }
 
@@ -514,12 +518,12 @@ TEST_F(LoadChannelMgrTest, close_failed) {
         }
         row_batch.serialize(request.mutable_row_batch());
         close_status = OLAP_ERR_TABLE_NOT_FOUND;
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
-        auto st = mgr.add_batch(request, &tablet_vec);
+        PTabletWriterAddBatchResult response;
+        auto st = mgr.add_batch(request, &response);
         request.release_id();
         // even if delta close failed, the return status is still ok, but 
tablet_vec is empty
         ASSERT_TRUE(st.ok());
-        ASSERT_TRUE(tablet_vec.empty());
+        ASSERT_TRUE(response.tablet_vec().empty());
     }
 }
 
@@ -602,8 +606,8 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) {
             row_batch.commit_last_row();
         }
         row_batch.serialize(request.mutable_row_batch());
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
-        auto st = mgr.add_batch(request, &tablet_vec);
+        PTabletWriterAddBatchResult response;
+        auto st = mgr.add_batch(request, &response);
         request.release_id();
         ASSERT_FALSE(st.ok());
     }
@@ -688,11 +692,11 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
             row_batch.commit_last_row();
         }
         row_batch.serialize(request.mutable_row_batch());
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec1;
-        auto st = mgr.add_batch(request, &tablet_vec1);
+        PTabletWriterAddBatchResult response;
+        auto st = mgr.add_batch(request, &response);
         ASSERT_TRUE(st.ok());
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec2;
-        st = mgr.add_batch(request, &tablet_vec2);
+        PTabletWriterAddBatchResult response2;
+        st = mgr.add_batch(request, &response2);
         request.release_id();
         ASSERT_TRUE(st.ok());
     }
@@ -704,8 +708,8 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
         request.set_sender_id(0);
         request.set_eos(true);
         request.set_packet_seq(0);
-        google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
-        auto st = mgr.add_batch(request, &tablet_vec);
+        PTabletWriterAddBatchResult response;
+        auto st = mgr.add_batch(request, &response);
         request.release_id();
         ASSERT_TRUE(st.ok());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
index f1a9c56..26658a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
@@ -35,7 +35,7 @@ public class LoadProcDir implements ProcDirInterface {
             .add("JobId").add("Label").add("State").add("Progress")
             
.add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime")
             
.add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime")
-            .add("URL").add("JobDetails")
+            
.add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets")
             .build();
 
     // label and state column index of result
@@ -65,10 +65,10 @@ public class LoadProcDir implements ProcDirInterface {
 
         // merge load job from load and loadManager
         LinkedList<List<Comparable>> loadJobInfos = 
load.getLoadJobInfosByDb(db.getId(), db.getFullName(),
-                                                                             
null, false, null);
+                null, false, null);
         
loadJobInfos.addAll(Catalog.getCurrentCatalog().getLoadManager().getLoadJobInfosByDb(db.getId(),
 null,
-                                                                               
              false,
-                                                                               
              null));
+                false,
+                null));
         int counter = 0;
         Iterator<List<Comparable>> iterator = 
loadJobInfos.descendingIterator();
         while (iterator.hasNext()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java 
b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
index 532a6b5..076e1f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
@@ -29,8 +29,8 @@ import org.apache.doris.qe.ConnectContext;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
-import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 
@@ -73,7 +73,7 @@ public class LdapAuthenticate {
         try {
             if (!LdapClient.checkPassword(userName, password)) {
                 LOG.debug("user:{} use error LDAP password.", userName);
-                ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, 
qualifiedUser, usePasswd);
+                ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, 
qualifiedUser, context.getRemoteIP(), usePasswd);
                 return false;
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index c0290f1..6f03f4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -220,7 +220,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                 txnState.addTableIndexes(table);
             }
         } finally {
-           MetaLockUtils.readUnlockTables(tableList);
+            MetaLockUtils.readUnlockTables(tableList);
         }
         // Submit task outside the database lock, cause it may take a while if 
task queue is full.
         for (LoadTask loadTask : newLoadingTasks) {
@@ -342,6 +342,8 @@ public class BrokerLoadJob extends BulkLoadJob {
             loadingStatus.setTrackingUrl(attachment.getTrackingUrl());
         }
         commitInfos.addAll(attachment.getCommitInfoList());
+        errorTabletInfos.addAll(attachment.getErrorTabletInfos());
+
         progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 
100);
         if (progress == 100) {
             progress = 99;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
index 0a6acd9..7aefd33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TabletCommitInfo;
 
 import java.util.List;
@@ -27,13 +28,16 @@ public class BrokerLoadingTaskAttachment extends 
TaskAttachment {
     private Map<String, String> counters;
     private String trackingUrl;
     private List<TabletCommitInfo> commitInfoList;
+    List<ErrorTabletInfo> errorTabletInfos;
 
     public BrokerLoadingTaskAttachment(long taskId, Map<String, String> 
counters, String trackingUrl,
-                                       List<TabletCommitInfo> commitInfoList) {
+                                       List<TabletCommitInfo> commitInfoList,
+                                       List<ErrorTabletInfo> errorTabletInfos) 
{
         super(taskId);
         this.trackingUrl = trackingUrl;
         this.counters = counters;
         this.commitInfoList = commitInfoList;
+        this.errorTabletInfos = errorTabletInfos;
     }
 
     public String getCounter(String key) {
@@ -47,4 +51,8 @@ public class BrokerLoadingTaskAttachment extends 
TaskAttachment {
     public List<TabletCommitInfo> getCommitInfoList() {
         return commitInfoList;
     }
+
+    public List<ErrorTabletInfo> getErrorTabletInfos() {
+        return errorTabletInfos;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 5992bed..756f6ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -56,6 +56,7 @@ import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 
@@ -69,6 +70,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
@@ -129,6 +131,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     // only for persistence param. see readFields() for usage
     private boolean isJobTypeRead = false;
 
+    protected List<ErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
+
     public static class LoadStatistic {
         // number of rows processed on BE, this number will be updated 
periodically by query report.
         // A load job may has several load tasks(queries), and each task has 
several fragments.
@@ -773,12 +777,23 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             // tracking url
             jobInfo.add(loadingStatus.getTrackingUrl());
             jobInfo.add(loadStatistic.toJson());
+            // transaction id
+            jobInfo.add(transactionId);
+            // error tablets
+            jobInfo.add(errorTabletsToJson());
             return jobInfo;
         } finally {
             readUnlock();
         }
     }
 
+    public String errorTabletsToJson() {
+        Map<Long, String> map = Maps.newHashMap();
+        errorTabletInfos.stream().limit(3).forEach(p -> 
map.put(p.getTabletId(), p.getMsg()));
+        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+        return gson.toJson(map);
+    }
+
     protected String getResourceName() {
         return "N/A";
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index dc6c371..6ff10fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -36,6 +36,7 @@ import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TabletCommitInfo;
 
 import org.apache.logging.log4j.LogManager;
@@ -109,7 +110,7 @@ public class LoadLoadingTask extends LoadTask {
     }
 
     @Override
-    protected void executeTask() throws Exception{
+    protected void executeTask() throws Exception {
         LOG.info("begin to execute loading task. load id: {} job id: {}. db: 
{}, tbl: {}. left retry: {}",
                 DebugUtil.printId(loadId), callback.getCallbackId(), 
db.getFullName(), table.getName(), retryTime);
         retryTime--;
@@ -127,8 +128,8 @@ public class LoadLoadingTask extends LoadTask {
         /*
          * For broker load job, user only need to set mem limit by 
'exec_mem_limit' property.
          * And the variable 'load_mem_limit' does not make any effect.
-         * However, in order to ensure the consistency of semantics when 
executing on the BE side, 
-         * and to prevent subsequent modification from incorrectly setting the 
load_mem_limit, 
+         * However, in order to ensure the consistency of semantics when 
executing on the BE side,
+         * and to prevent subsequent modification from incorrectly setting the 
load_mem_limit,
          * here we use exec_mem_limit to directly override the load_mem_limit 
property.
          */
         curCoordinator.setLoadMemLimit(execMemLimit);
@@ -150,10 +151,10 @@ public class LoadLoadingTask extends LoadTask {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
-                              .add("task_id", signature)
-                              .add("query_id", 
DebugUtil.printId(curCoordinator.getQueryId()))
-                              .add("msg", "begin to execute plan")
-                              .build());
+                    .add("task_id", signature)
+                    .add("query_id", 
DebugUtil.printId(curCoordinator.getQueryId()))
+                    .add("msg", "begin to execute plan")
+                    .build());
         }
         curCoordinator.exec();
         if (curCoordinator.join(waitSecond)) {
@@ -162,7 +163,8 @@ public class LoadLoadingTask extends LoadTask {
                 attachment = new BrokerLoadingTaskAttachment(signature,
                         curCoordinator.getLoadCounters(),
                         curCoordinator.getTrackingUrl(),
-                        
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()));
+                        
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()),
+                        
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()));
                 // Create profile of this task and add to the job profile.
                 createProfile(curCoordinator);
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 7d4ad46..6e8c66c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -280,7 +280,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         // If user does not specify kafka partition,
         // We will fetch partition from kafka server periodically
         if (this.state == JobState.RUNNING || this.state == 
JobState.NEED_SCHEDULE) {
-            if (customKafkaPartitions == null && 
!customKafkaPartitions.isEmpty()) {
+            if (customKafkaPartitions != null && 
!customKafkaPartitions.isEmpty()) {
                 return;
             }
             updateKafkaPartitions();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
index ae55381..b323600 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
@@ -56,7 +56,7 @@ public class MysqlProto {
         List<UserIdentity> currentUserIdentity = Lists.newArrayList();
         if 
(!Catalog.getCurrentCatalog().getAuth().checkPassword(qualifiedUser, remoteIp,
                 scramble, randomString, currentUserIdentity)) {
-            ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, 
qualifiedUser, usePasswd);
+            ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, 
qualifiedUser, context.getRemoteIP(), usePasswd);
             return false;
         }
 
@@ -70,7 +70,7 @@ public class MysqlProto {
 
         String tmpUser = user;
         if (tmpUser == null || tmpUser.isEmpty()) {
-            ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, "no user", 
usePasswd);
+            ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, "anonym@" + 
context.getRemoteIP(), usePasswd);
             return null;
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 4290dd1..4afcabc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -520,6 +520,10 @@ public class ConnectContext {
         return currentConnectedFEIp;
     }
 
+    public String getRemoteIp() {
+        return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp();
+    }
+
     public class ThreadInfo {
         public List<String> toRow(long nowMs) {
             List<String> row = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 51d5dea..faee830 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -470,19 +470,6 @@ public class ConnectProcessor {
         }
 
         ctx.setThreadLocalInfo();
-
-        if (ctx.getCurrentUserIdentity() == null) {
-            // if we upgrade Master FE first, the request from old FE does not 
set "current_user_ident".
-            // so ctx.getCurrentUserIdentity() will get null, and causing 
NullPointerException after using it.
-            // return error directly.
-            TMasterOpResult result = new TMasterOpResult();
-            ctx.getState().setError(ErrorCode.ERR_COMMON_ERROR, "Missing 
current user identity. You need to upgrade this Frontend " +
-                    "to the same version as Master Frontend.");
-            
result.setMaxJournalId(Catalog.getCurrentCatalog().getMaxJournalId().longValue());
-            result.setPacket(getResultPacket());
-            return result;
-        }
-
         StmtExecutor executor = null;
         try {
             // 0 for compatibility.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 87d9c2b..407bbea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -65,6 +65,7 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TErrorTabletInfo;
 import org.apache.doris.thrift.TEsScanRange;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TLoadErrorHubInfo;
@@ -198,6 +199,7 @@ public class Coordinator {
     private List<String> exportFiles;
 
     private List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
+    private List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
 
     // Input parameter
     private long jobId = -1; // job which this task belongs to
@@ -362,6 +364,10 @@ public class Coordinator {
         return commitInfos;
     }
 
+    public List<TErrorTabletInfo> getErrorTabletInfos() {
+        return errorTabletInfos;
+    }
+
     // Initialize
     private void prepare() {
         for (PlanFragment fragment : fragments) {
@@ -472,11 +478,11 @@ public class Coordinator {
                     && ((ResultFileSink) topDataSink).getStorageType() == 
StorageBackend.StorageType.BROKER) {
                 // set the broker address for OUTFILE sink
                 ResultFileSink topResultFileSink = (ResultFileSink) 
topDataSink;
-                    FsBroker broker = 
Catalog.getCurrentCatalog().getBrokerMgr()
-                            .getBroker(topResultFileSink.getBrokerName(), 
execBeAddr.getHostname());
+                FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr()
+                        .getBroker(topResultFileSink.getBrokerName(), 
execBeAddr.getHostname());
                 topResultFileSink.setBrokerAddr(broker.ip, broker.port);
             }
-        }  else {
+        } else {
             // This is a load process.
             this.queryOptions.setIsReportSuccess(true);
             deltaUrls = Lists.newArrayList();
@@ -704,6 +710,15 @@ public class Coordinator {
         }
     }
 
+    private void updateErrorTabletInfos(List<TErrorTabletInfo> 
errorTabletInfos) {
+        lock.lock();
+        try {
+            this.errorTabletInfos.addAll(errorTabletInfos);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     private void updateStatus(Status status, TUniqueId instanceId) {
         lock.lock();
         try {
@@ -1193,10 +1208,10 @@ public class Coordinator {
     // Traverse the expected runtimeFilterID in each fragment, and establish 
the corresponding relationship
     // between runtimeFilterID and fragment instance addr and select the merge 
instance of runtimeFilter
     private void assignRuntimeFilterAddr() throws Exception {
-        for (PlanFragment fragment: fragments) {
+        for (PlanFragment fragment : fragments) {
             FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
             // Transform <fragment, runtimeFilterId> to <runtimeFilterId, 
fragment>
-            for (RuntimeFilterId rid: fragment.getTargetRuntimeFilterIds()) {
+            for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) {
                 List<FRuntimeFilterTargetParam> targetFragments =
                         ridToTargetParam.computeIfAbsent(rid, k -> new 
ArrayList<>());
                 for (final FInstanceExecParam instance : 
params.instanceExecParams) {
@@ -1204,7 +1219,7 @@ public class Coordinator {
                 }
             }
 
-            for (RuntimeFilterId rid: fragment.getBuilderRuntimeFilterIds()) {
+            for (RuntimeFilterId rid : fragment.getBuilderRuntimeFilterIds()) {
                 ridToBuilderNum.merge(rid, params.instanceExecParams.size(), 
Integer::sum);
             }
         }
@@ -1262,7 +1277,7 @@ public class Coordinator {
 
     // weather we can overwrite the first parameter or not?
     private List<TScanRangeParams> findOrInsert(Map<Integer, 
List<TScanRangeParams>> m, Integer key,
-            ArrayList<TScanRangeParams> defaultVal) {
+                                                ArrayList<TScanRangeParams> 
defaultVal) {
         List<TScanRangeParams> value = m.get(key);
         if (value == null) {
             m.put(key, defaultVal);
@@ -1419,8 +1434,8 @@ public class Coordinator {
     }
 
     public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations 
seqLocation,
-                                          HashMap<TNetworkAddress, Long> 
assignedBytesPerHost,
-                                          Reference<Long> backendIdRef) throws 
UserException {
+                                                         
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
+                                                         Reference<Long> 
backendIdRef) throws UserException {
         if (!Config.enable_local_replica_selection) {
             return selectBackendsByRoundRobin(seqLocation.getLocations(), 
assignedBytesPerHost, backendIdRef);
         }
@@ -1541,6 +1556,9 @@ public class Coordinator {
             if (params.isSetCommitInfos()) {
                 updateCommitInfos(params.getCommitInfos());
             }
+            if (params.isSetErrorTabletInfos()) {
+                updateErrorTabletInfos(params.getErrorTabletInfos());
+            }
             profileDoneSignal.markedCountDown(params.getFragmentInstanceId(), 
-1L);
         }
 
@@ -1706,7 +1724,7 @@ public class Coordinator {
 
         // make sure each host have average bucket to scan
         private void 
getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, 
PlanFragmentId fragmentId, Integer bucketSeq,
-            ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, 
Long> addressToBackendID) throws Exception {
+                                                              
ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> 
addressToBackendID) throws Exception {
             Map<Long, Integer> buckendIdToBucketCountMap = 
fragmentIdToBuckendIdBucketCountMap.get(fragmentId);
             int maxBucketNum = Integer.MAX_VALUE;
             long buckendId = Long.MAX_VALUE;
@@ -1860,7 +1878,7 @@ public class Coordinator {
         long lastMissingHeartbeatTime = -1;
 
         public BackendExecState(PlanFragmentId fragmentId, int instanceId, int 
profileFragmentId,
-            TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> 
addressToBackendID) {
+                                TExecPlanFragmentParams rpcParams, 
Map<TNetworkAddress, Long> addressToBackendID) {
             this.profileFragmentId = profileFragmentId;
             this.fragmentId = fragmentId;
             this.instanceId = instanceId;
@@ -2076,18 +2094,18 @@ public class Coordinator {
                 params.params.setRuntimeFilterParams(new 
TRuntimeFilterParams());
                 
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
                 if 
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
-                    for (Map.Entry<RuntimeFilterId, 
List<FRuntimeFilterTargetParam>> entry: ridToTargetParam.entrySet()) {
+                    for (Map.Entry<RuntimeFilterId, 
List<FRuntimeFilterTargetParam>> entry : ridToTargetParam.entrySet()) {
                         List<TRuntimeFilterTargetParams> targetParams = 
Lists.newArrayList();
-                        for (FRuntimeFilterTargetParam targetParam: 
entry.getValue()) {
+                        for (FRuntimeFilterTargetParam targetParam : 
entry.getValue()) {
                             targetParams.add(new 
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
                                     targetParam.targetFragmentInstanceAddr));
                         }
                         
params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(),
 targetParams);
                     }
-                    for (Map.Entry<RuntimeFilterId, Integer> entry: 
ridToBuilderNum.entrySet()) {
+                    for (Map.Entry<RuntimeFilterId, Integer> entry : 
ridToBuilderNum.entrySet()) {
                         
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(entry.getKey().asInt(),
 entry.getValue());
                     }
-                    for (RuntimeFilter rf: assignedRuntimeFilters) {
+                    for (RuntimeFilter rf : assignedRuntimeFilters) {
                         
params.params.runtime_filter_params.putToRidToRuntimeFilter(rf.getFilterId().asInt(),
 rf.toThrift());
                     }
                 }
@@ -2185,7 +2203,7 @@ public class Coordinator {
         }
 
         public FInstanceExecParam(TUniqueId id, TNetworkAddress host,
-                int perFragmentInstanceIdx, FragmentExecParams 
fragmentExecParams) {
+                                  int perFragmentInstanceIdx, 
FragmentExecParams fragmentExecParams) {
             this.instanceId = id;
             this.host = host;
             this.perFragmentInstanceIdx = perFragmentInstanceIdx;
@@ -2229,7 +2247,8 @@ public class Coordinator {
 
     // Runtime filter target fragment instance param
     static class FRuntimeFilterTargetParam {
-        public TUniqueId targetFragmentInstanceId;;
+        public TUniqueId targetFragmentInstanceId;
+        ;
         public TNetworkAddress targetFragmentInstanceAddr;
 
         public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/ErrorTabletInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/ErrorTabletInfo.java
new file mode 100644
index 0000000..6410888
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/ErrorTabletInfo.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TErrorTabletInfo;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class ErrorTabletInfo {
+    private long tabletId;
+    private String msg;
+
+    public ErrorTabletInfo(long tabletId, String msg) {
+        this.tabletId = tabletId;
+        this.msg = msg;
+    }
+
+    public long getTabletId() {
+        return tabletId;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public static List<ErrorTabletInfo> fromThrift(List<TErrorTabletInfo> 
errorTabletInfos) {
+        List<ErrorTabletInfo> errorInfos = Lists.newArrayList();
+        for (TErrorTabletInfo tErrorInfo : errorTabletInfos) {
+            errorInfos.add(new ErrorTabletInfo(tErrorInfo.getTabletId(), 
tErrorInfo.getMsg()));
+        }
+        return errorInfos;
+    }
+}
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 7de412d..b78a64e 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -105,12 +105,18 @@ message PTabletWriterAddBatchRequest {
     optional bool is_high_priority = 11 [default = false];
 };
 
+message PTabletError {
+    optional int64 tablet_id = 1;
+    optional string msg = 2;
+}
+
 message PTabletWriterAddBatchResult {
     required PStatus status = 1;
     repeated PTabletInfo tablet_vec = 2;
     optional int64 execution_time_us = 3;
     optional int64 wait_lock_time_us = 4;
     optional int64 wait_execution_time_us = 5;
+    repeated PTabletError tablet_errors = 6;
 };
 
 // tablet writer cancel
diff --git a/gensrc/script/gen_build_version.sh 
b/gensrc/script/gen_build_version.sh
index b9a3f1a..2937cdb 100755
--- a/gensrc/script/gen_build_version.sh
+++ b/gensrc/script/gen_build_version.sh
@@ -25,7 +25,7 @@
 # contains the build version based on the git hash or svn revision.
 ##############################################################
 
-build_version="pre-0.15.0"
+build_version="trunk"
 
 unset LANG
 unset LC_CTYPE
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 1b5cade..2818196 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -400,6 +400,8 @@ struct TReportExecStatusParams {
   16: optional i64 backend_id
 
   17: optional i64 loaded_bytes
+
+  18: optional list<Types.TErrorTabletInfo> errorTabletInfos
 }
 
 struct TFeResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index c69f0d9..67a40c1 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -408,6 +408,11 @@ struct TTabletCommitInfo {
     2: required i64 backendId
 }
 
+struct TErrorTabletInfo {
+    1: optional i64 tabletId
+    2: optional string msg
+}
+
 enum TLoadType {
     MANUL_LOAD,
     ROUTINE_LOAD,

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to