This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef984a6 [improvement](load) Improve load fault tolerance (#7674)
ef984a6 is described below
commit ef984a6a7209f2f4fe74a29a2946f633eb02f7fd
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Jan 20 09:23:21 2022 +0800
[improvement](load) Improve load fault tolerance (#7674)
Currently, if we encounter a problem with a replica of a tablet during the
load process,
such as a write error, rpc error, -235, etc., it will cause the entire load
job to fail,
which results in a significant reduction in Doris' fault tolerance.
This PR mainly changes:
1. refined the judgment of failed replicas in the load process, so that the
failure of a few replicas will not affect the normal completion of the load job.
2. fix a bug introduced from #7754 that may cause BE coredump
---
be/src/exec/broker_scanner.cpp | 2 +-
be/src/exec/tablet_sink.cpp | 148 +++++++++++++--------
be/src/exec/tablet_sink.h | 38 +++---
be/src/olap/delta_writer.cpp | 18 +--
be/src/olap/delta_writer.h | 4 +-
be/src/runtime/fragment_mgr.cpp | 7 +
be/src/runtime/load_channel.cpp | 15 +--
be/src/runtime/load_channel.h | 5 +-
be/src/runtime/load_channel_mgr.cpp | 7 +-
be/src/runtime/load_channel_mgr.h | 2 +-
be/src/runtime/runtime_state.h | 7 +
be/src/runtime/tablets_channel.cpp | 83 +++++++-----
be/src/runtime/tablets_channel.h | 10 +-
be/src/service/internal_service.cpp | 3 +-
be/src/vec/sink/vtablet_sink.cpp | 2 +-
be/test/olap/delta_writer_test.cpp | 6 +-
be/test/runtime/load_channel_mgr_test.cpp | 38 +++---
.../org/apache/doris/common/proc/LoadProcDir.java | 8 +-
.../org/apache/doris/ldap/LdapAuthenticate.java | 4 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 4 +-
.../load/loadv2/BrokerLoadingTaskAttachment.java | 10 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 15 +++
.../apache/doris/load/loadv2/LoadLoadingTask.java | 18 +--
.../load/routineload/KafkaRoutineLoadJob.java | 2 +-
.../java/org/apache/doris/mysql/MysqlProto.java | 4 +-
.../java/org/apache/doris/qe/ConnectContext.java | 4 +
.../java/org/apache/doris/qe/ConnectProcessor.java | 13 --
.../main/java/org/apache/doris/qe/Coordinator.java | 53 +++++---
.../apache/doris/transaction/ErrorTabletInfo.java | 50 +++++++
gensrc/proto/internal_service.proto | 6 +
gensrc/script/gen_build_version.sh | 2 +-
gensrc/thrift/FrontendService.thrift | 2 +
gensrc/thrift/Types.thrift | 5 +
33 files changed, 386 insertions(+), 209 deletions(-)
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index ab96e6a..af44b8f 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -340,7 +340,7 @@ void BrokerScanner::split_line(const Slice& line) {
_split_values.emplace_back(buf, len);
}
delete row;
- delete ptr;
+ delete[] ptr;
} else {
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 41f3570..eb018a6 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -42,9 +42,9 @@
namespace doris {
namespace stream_load {
-NodeChannel::NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t
node_id,
+NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel,
int64_t node_id,
int32_t schema_hash)
- : _parent(parent), _index_id(index_id), _node_id(node_id),
_schema_hash(schema_hash) {}
+ : _parent(parent), _index_channel(index_channel), _node_id(node_id),
_schema_hash(schema_hash) {}
NodeChannel::~NodeChannel() {
if (_open_closure != nullptr) {
@@ -90,7 +90,7 @@ Status NodeChannel::init(RuntimeState* state) {
// Initialize _cur_add_batch_request
_cur_add_batch_request.set_allocated_id(&_parent->_load_id);
- _cur_add_batch_request.set_index_id(_index_id);
+ _cur_add_batch_request.set_index_id(_index_channel->_index_id);
_cur_add_batch_request.set_sender_id(_parent->_sender_id);
_cur_add_batch_request.set_backend_id(_node_id);
_cur_add_batch_request.set_eos(false);
@@ -100,14 +100,14 @@ Status NodeChannel::init(RuntimeState* state) {
_load_info = "load_id=" + print_id(_parent->_load_id) +
", txn_id=" + std::to_string(_parent->_txn_id);
- _name = fmt::format("NodeChannel[{}-{}]", _index_id, _node_id);
+ _name = fmt::format("NodeChannel[{}-{}]", _index_channel->_index_id,
_node_id);
return Status::OK();
}
void NodeChannel::open() {
PTabletWriterOpenRequest request;
request.set_allocated_id(&_parent->_load_id);
- request.set_index_id(_index_id);
+ request.set_index_id(_index_channel->_index_id);
request.set_txn_id(_parent->_txn_id);
request.set_allocated_schema(_parent->_schema->to_protobuf());
for (auto& tablet : _all_tablets) {
@@ -176,15 +176,27 @@ Status NodeChannel::open_wait() {
// add batch closure
_add_batch_closure =
ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this]() {
- _cancel_with_msg(
- fmt::format("{}, err: {}", channel_info(),
_add_batch_closure->cntl.ErrorText()));
+ // If rpc failed, mark all tablets on this node channel as failed
+ _index_channel->mark_as_failed(this,
_add_batch_closure->cntl.ErrorText(), -1);
+ Status st = _index_channel->check_intolerable_failure();
+ if (!st.ok()) {
+ _cancel_with_msg(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
+ }
});
_add_batch_closure->addSuccessHandler([this](const
PTabletWriterAddBatchResult& result,
bool is_last_rpc) {
Status status(result.status());
if (status.ok()) {
- if (is_last_rpc) {
+ // if has error tablet, handle them first
+ for (auto& error : result.tablet_errors()) {
+ _index_channel->mark_as_failed(this, error.msg(),
error.tablet_id());
+ }
+
+ Status st = _index_channel->check_intolerable_failure();
+ if (!st.ok()) {
+ _cancel_with_msg(st.get_error_msg());
+ } else if (is_last_rpc) {
for (auto& tablet : result.tablet_vec()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet.tablet_id();
@@ -254,6 +266,8 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t
tablet_id) {
return Status::OK();
}
+// Used for vectorized engine.
+// TODO(cmy): deprecated, need refactor
Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
// If add_row() when _eos_is_produced==true, there must be sth wrong, we
can only mark this channel as failed.
auto st = none_of({_cancelled, _eos_is_produced});
@@ -319,6 +333,7 @@ Status NodeChannel::mark_close() {
_pending_batches.emplace(std::move(_cur_batch),
_cur_add_batch_request);
_pending_batches_num++;
DCHECK(_pending_batches.back().second.eos());
+ LOG(INFO) << channel_info() << " mark closed, left pending batch size:
" << _pending_batches.size();
}
_eos_is_produced = true;
@@ -356,6 +371,8 @@ Status NodeChannel::close_wait(RuntimeState* state) {
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(_tablet_commit_infos.begin()),
std::make_move_iterator(_tablet_commit_infos.end()));
+
+ _index_channel->set_error_tablet_in_state(state);
return Status::OK();
}
@@ -377,7 +394,7 @@ void NodeChannel::cancel(const std::string& cancel_msg) {
PTabletWriterCancelRequest request;
request.set_allocated_id(&_parent->_load_id);
- request.set_index_id(_index_id);
+ request.set_index_id(_index_channel->_index_id);
request.set_sender_id(_parent->_sender_id);
auto closure = new RefCountClosure<PTabletWriterCancelResult>();
@@ -511,7 +528,7 @@ Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPart
auto it = _node_channels.find(node_id);
if (it == _node_channels.end()) {
channel = _parent->_pool->add(
- new NodeChannel(_parent, _index_id, node_id,
_schema_hash));
+ new NodeChannel(_parent, this, node_id, _schema_hash));
_node_channels.emplace(node_id, channel);
} else {
channel = it->second;
@@ -528,59 +545,74 @@ Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPart
return Status::OK();
}
-Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
+void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
auto it = _channels_by_tablet.find(tablet_id);
DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" <<
tablet_id;
- std::stringstream ss;
for (auto channel : it->second) {
// if this node channel is already failed, this add_row will be skipped
auto st = channel->add_row(tuple, tablet_id);
if (!st.ok()) {
- mark_as_failed(channel);
- ss << st.get_error_msg() << "; ";
+ mark_as_failed(channel, st.get_error_msg(), tablet_id);
+ // continue add row to other node, the error will be checked for
every batch outside
}
}
-
- if (has_intolerable_failure()) {
- std::stringstream ss2;
- ss2 << "index channel has intolerable failure. " <<
BackendOptions::get_localhost()
- << ", err: " << ss.str();
- return Status::InternalError(ss2.str());
- }
-
- return Status::OK();
}
-Status IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+// Used for vectorized engine.
+// TODO(cmy): deprecated, need refactor
+void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
auto it = _channels_by_tablet.find(tablet_id);
DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" <<
tablet_id;
- std::stringstream ss;
for (auto channel : it->second) {
// if this node channel is already failed, this add_row will be skipped
auto st = channel->add_row(block_row, tablet_id);
if (!st.ok()) {
- mark_as_failed(channel);
- ss << st.get_error_msg() << "; ";
+ mark_as_failed(channel, st.get_error_msg(), tablet_id);
}
}
+}
- if (has_intolerable_failure()) {
- std::stringstream ss2;
- ss2 << "index channel has intolerable failure. " <<
BackendOptions::get_localhost()
- << ", err: " << ss.str();
- return Status::InternalError(ss2.str());
+void IndexChannel::mark_as_failed(const NodeChannel* ch, const std::string&
err, int64_t tablet_id) {
+ const auto& it = _tablets_by_channel.find(ch->node_id());
+ if (it == _tablets_by_channel.end()) {
+ return;
}
- return Status::OK();
+ {
+ std::lock_guard<SpinLock> l(_fail_lock);
+ if (tablet_id == -1) {
+ for (const auto the_tablet_id : it->second) {
+ _failed_channels[the_tablet_id].insert(ch->node_id());
+ _failed_channels_msgs.emplace(the_tablet_id, err + ", host: "
+ ch->host());
+ if (_failed_channels[the_tablet_id].size() >=
((_parent->_num_replicas + 1) / 2)) {
+ _intolerable_failure_status =
Status::InternalError(_failed_channels_msgs[the_tablet_id]);
+ }
+ }
+ } else {
+ _failed_channels[tablet_id].insert(ch->node_id());
+ _failed_channels_msgs.emplace(tablet_id, err + ", host: " +
ch->host());
+ if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas
+ 1) / 2)) {
+ _intolerable_failure_status =
Status::InternalError(_failed_channels_msgs[tablet_id]);
+ }
+ }
+ }
}
-bool IndexChannel::has_intolerable_failure() {
- for (const auto& it : _failed_channels) {
- if (it.second.size() >= ((_parent->_num_replicas + 1) / 2)) {
- return true;
- }
+Status IndexChannel::check_intolerable_failure() {
+ std::lock_guard<SpinLock> l(_fail_lock);
+ return _intolerable_failure_status;
+}
+
+void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
+ std::vector<TErrorTabletInfo>& error_tablet_infos =
state->error_tablet_infos();
+
+ std::lock_guard<SpinLock> l(_fail_lock);
+ for (const auto& it : _failed_channels_msgs) {
+ TErrorTabletInfo error_info;
+ error_info.__set_tabletId(it.first);
+ error_info.__set_msg(it.second);
+ error_tablet_infos.emplace_back(error_info);
}
- return false;
}
OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
@@ -757,22 +789,17 @@ Status OlapTableSink::open(RuntimeState* state) {
}
for (auto index_channel : _channels) {
- std::stringstream ss;
- index_channel->for_each_node_channel([&index_channel,
&ss](NodeChannel* ch) {
+ index_channel->for_each_node_channel([&index_channel](NodeChannel* ch)
{
auto st = ch->open_wait();
if (!st.ok()) {
- std::stringstream err;
- err << ch->channel_info() << ", tablet open failed, err: " <<
st.get_error_msg();
- LOG(WARNING) << err.str();
- ss << err.str() << "; ";
- index_channel->mark_as_failed(ch);
+ // The open() phase is mainly to generate DeltaWriter
instances on the nodes corresponding to each node channel.
+ // This phase will not fail due to a single tablet.
+ // Therefore, if the open() phase fails, all tablets
corresponding to the node need to be marked as failed.
+ index_channel->mark_as_failed(ch, fmt::format("{}, open
failed, err: {}", ch->channel_info(), st.get_error_msg()), -1);
}
});
- if (index_channel->has_intolerable_failure()) {
- LOG(WARNING) << "open failed, load_id=" << _load_id << ", err: "
<< ss.str();
- return Status::InternalError(ss.str());
- }
+ RETURN_IF_ERROR(index_channel->check_intolerable_failure());
}
int32_t send_batch_parallelism =
MIN(_send_batch_parallelism,
config::max_send_batch_parallelism_per_job);
@@ -844,10 +871,15 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch*
input_batch) {
uint32_t tablet_index = dist_hash % partition->num_buckets;
for (int j = 0; j < partition->indexes.size(); ++j) {
int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
- RETURN_IF_ERROR(_channels[j]->add_row(tuple, tablet_id));
+ _channels[j]->add_row(tuple, tablet_id);
_number_output_rows++;
}
}
+
+ // check intolerable failure
+ for (auto index_channel : _channels) {
+ RETURN_IF_ERROR(index_channel->check_intolerable_failure());
+ }
return Status::OK();
}
@@ -879,14 +911,13 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
for (auto index_channel : _channels) {
int64_t add_batch_exec_time = 0;
index_channel->for_each_node_channel(
- [&status, &state, &node_add_batch_counter_map,
&serialize_batch_ns,
+ [&index_channel, &state, &node_add_batch_counter_map,
&serialize_batch_ns,
&mem_exceeded_block_ns, &queue_push_lock_ns,
&actual_consume_ns,
&total_add_batch_exec_time_ns, &add_batch_exec_time,
&total_add_batch_num](NodeChannel* ch) {
auto s = ch->close_wait(state);
if (!s.ok()) {
- // 'status' will store the last non-ok status
of all channels
- status = s;
+ index_channel->mark_as_failed(ch,
s.get_error_msg(), -1);
LOG(WARNING)
<< ch->channel_info()
<< ", close channel failed, err: " <<
s.get_error_msg();
@@ -900,7 +931,14 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
if (add_batch_exec_time > max_add_batch_exec_time_ns) {
max_add_batch_exec_time_ns = add_batch_exec_time;
}
- }
+
+ // check if index has intolerable failure
+ Status index_st = index_channel->check_intolerable_failure();
+ if (!index_st.ok()) {
+ status = index_st;
+ }
+ } // end for index channels
+
}
// TODO need to be improved
LOG(INFO) << "total mem_exceeded_block_ns=" << mem_exceeded_block_ns
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index df40493..77409e8e 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -144,9 +144,10 @@ private:
std::function<void(const T&, bool)> success_handler;
};
+class IndexChannel;
class NodeChannel {
public:
- NodeChannel(OlapTableSink* parent, int64_t index_id, int64_t node_id,
int32_t schema_hash);
+ NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t
node_id, int32_t schema_hash);
~NodeChannel() noexcept;
// called before open, used to add tablet located in this backend
@@ -195,6 +196,7 @@ public:
}
int64_t node_id() const { return _node_id; }
+ std::string host() const { return _node_info.host; }
std::string name() const { return _name; }
Status none_of(std::initializer_list<bool> vars);
@@ -203,10 +205,7 @@ public:
void clear_all_batches();
std::string channel_info() const {
- // FIXME(cmy): There is a problem that when calling node_info, the
node_info seems not initialized.
- // But I don't know why. so here I print node_info->id
instead of node_info->host
- // to avoid BE crash. It needs further observation.
- return fmt::format("{}, {}, node={}:{}", _name, _load_info,
_node_info.id,
+ return fmt::format("{}, {}, node={}:{}", _name, _load_info,
_node_info.host,
_node_info.brpc_port);
}
@@ -215,7 +214,7 @@ private:
private:
OlapTableSink* _parent = nullptr;
- int64_t _index_id = -1;
+ IndexChannel* _index_channel = nullptr;
int64_t _node_id = -1;
int32_t _schema_hash = 0;
std::string _load_info;
@@ -276,9 +275,9 @@ public:
Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets);
- Status add_row(Tuple* tuple, int64_t tablet_id);
+ void add_row(Tuple* tuple, int64_t tablet_id);
- Status add_row(BlockRow& block_row, int64_t tablet_id);
+ void add_row(BlockRow& block_row, int64_t tablet_id);
void for_each_node_channel(const std::function<void(NodeChannel*)>& func) {
for (auto& it : _node_channels) {
@@ -286,20 +285,17 @@ public:
}
}
- void mark_as_failed(const NodeChannel* ch) {
- const auto& it = _tablets_by_channel.find(ch->node_id());
- if (it == _tablets_by_channel.end()) {
- return;
- }
- for (const auto tablet_id : it->second) {
- _failed_channels[tablet_id].insert(ch->node_id());
- }
- }
- bool has_intolerable_failure();
+ void mark_as_failed(const NodeChannel* ch, const std::string& err, int64_t
tablet_id = -1);
+ Status check_intolerable_failure();
+
+ // set error tablet info in runtime state, so that it can be returned to
FE.
+ void set_error_tablet_in_state(RuntimeState* state);
size_t num_node_channels() const { return _node_channels.size(); }
private:
+ friend class NodeChannel;
+
OlapTableSink* _parent;
int64_t _index_id;
int32_t _schema_hash;
@@ -310,8 +306,14 @@ private:
std::unordered_map<int64_t, std::vector<NodeChannel*>> _channels_by_tablet;
// from backend channel to tablet_id
std::unordered_map<int64_t, std::unordered_set<int64_t>>
_tablets_by_channel;
+
+ // lock to protect _failed_channels and _failed_channels_msgs
+ mutable SpinLock _fail_lock;
// key is tablet_id, value is a set of failed node id
std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
+ // key is tablet_id, value is error message
+ std::unordered_map<int64_t, std::string> _failed_channels_msgs;
+ Status _intolerable_failure_status = Status::OK();
};
// Write data to Olap Table.
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index bc94c71..e970203 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -284,7 +284,7 @@ OLAPStatus DeltaWriter::close() {
return OLAP_SUCCESS;
}
-OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec) {
+OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec, bool is_broken) {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait()
being called";
@@ -333,13 +333,15 @@ OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
}
#ifndef BE_TEST
- PTabletInfo* tablet_info = tablet_vec->Add();
- tablet_info->set_tablet_id(_tablet->tablet_id());
- tablet_info->set_schema_hash(_tablet->schema_hash());
- if (_new_tablet != nullptr) {
- tablet_info = tablet_vec->Add();
- tablet_info->set_tablet_id(_new_tablet->tablet_id());
- tablet_info->set_schema_hash(_new_tablet->schema_hash());
+ if (!is_broken) {
+ PTabletInfo* tablet_info = tablet_vec->Add();
+ tablet_info->set_tablet_id(_tablet->tablet_id());
+ tablet_info->set_schema_hash(_tablet->schema_hash());
+ if (_new_tablet != nullptr) {
+ tablet_info = tablet_vec->Add();
+ tablet_info->set_tablet_id(_new_tablet->tablet_id());
+ tablet_info->set_schema_hash(_new_tablet->schema_hash());
+ }
}
#endif
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 4b80b571c..b8db713 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -68,7 +68,7 @@ public:
OLAPStatus close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
- OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec);
+ OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec, bool is_broken);
// abandon current memtable and wait for all pending-flushing memtables to
be destructed.
// mem_consumption() should be 0 after this function returns.
@@ -88,6 +88,8 @@ public:
// Wait all memtable in flush queue to be flushed
OLAPStatus wait_flush();
+ int64_t tablet_id() { return _tablet->tablet_id(); }
+
private:
DeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& parent,
StorageEngine* storage_engine);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d3c40e9..0f75ae7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -353,6 +353,13 @@ void FragmentExecState::coordinator_callback(const Status&
status, RuntimeProfil
params.commitInfos.push_back(info);
}
}
+ if (!runtime_state->error_tablet_infos().empty()) {
+ params.__isset.errorTabletInfos = true;
+
params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size());
+ for (auto& info : runtime_state->error_tablet_infos()) {
+ params.errorTabletInfos.push_back(info);
+ }
+ }
// Send new errors to coordinator
runtime_state->get_unreported_errors(&(params.error_log));
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 27352a0..ee33cc3 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -24,8 +24,10 @@
namespace doris {
LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t
timeout_s,
- const std::shared_ptr<MemTracker>& mem_tracker, bool
is_high_priority)
- : _load_id(load_id), _timeout_s(timeout_s),
_is_high_priority(is_high_priority) {
+ const std::shared_ptr<MemTracker>& mem_tracker, bool
is_high_priority,
+ const std::string& sender_ip)
+ : _load_id(load_id), _timeout_s(timeout_s),
_is_high_priority(is_high_priority),
+ _sender_ip(sender_ip) {
_mem_tracker = MemTracker::CreateTracker(
mem_limit, "LoadChannel:" + _load_id.to_string(), mem_tracker,
true, false, MemTrackerLevel::TASK);
// _last_updated_time should be set before being inserted to
@@ -42,9 +44,6 @@ LoadChannel::~LoadChannel() {
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
int64_t index_id = params.index_id();
- if (params.has_sender_ip()) {
- _sender_ip = params.sender_ip();
- }
std::shared_ptr<TabletsChannel> channel;
{
std::lock_guard<std::mutex> l(_lock);
@@ -67,7 +66,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
}
Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
- google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec) {
+ PTabletWriterAddBatchResult* response) {
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;
@@ -91,7 +90,7 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBatchRequest& request,
// 3. add batch to tablets channel
if (request.has_row_batch()) {
- RETURN_IF_ERROR(channel->add_batch(request));
+ RETURN_IF_ERROR(channel->add_batch(request, response));
}
// 4. handle eos
@@ -100,7 +99,7 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBatchRequest& request,
bool finished = false;
RETURN_IF_ERROR(channel->close(request.sender_id(),
request.backend_id(),
&finished, request.partition_ids(),
- tablet_vec));
+ response->mutable_tablet_vec()));
if (finished) {
std::lock_guard<std::mutex> l(_lock);
_tablets_channels.erase(index_id);
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 257dba8..13490f5 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,7 +39,8 @@ class TabletsChannel;
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
- const std::shared_ptr<MemTracker>& mem_tracker, bool
is_high_priority);
+ const std::shared_ptr<MemTracker>& mem_tracker, bool
is_high_priority,
+ const std::string& sender_ip);
~LoadChannel();
// open a new load channel if not exist
@@ -47,7 +48,7 @@ public:
// this batch must belong to a index in one transaction
Status add_batch(const PTabletWriterAddBatchRequest& request,
- google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec);
+ PTabletWriterAddBatchResult* response);
// return true if this load channel has been opened and all tablets
channels are closed then.
bool is_finished();
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 0b3367a..baa0e88 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -112,7 +112,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest&
params) {
int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s);
bool is_high_priority = (params.has_is_high_priority() &&
params.is_high_priority());
- channel.reset(new LoadChannel(load_id, job_max_memory,
job_timeout_s, _mem_tracker, is_high_priority));
+ channel.reset(new LoadChannel(load_id, job_max_memory,
job_timeout_s, _mem_tracker, is_high_priority,
+ params.sender_ip()));
_load_channels.insert({load_id, channel});
}
}
@@ -124,7 +125,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest&
params) {
static void dummy_deleter(const CacheKey& key, void* value) {}
Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
-
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
+ PTabletWriterAddBatchResult* response) {
UniqueId load_id(request.id());
// 1. get load channel
std::shared_ptr<LoadChannel> channel;
@@ -156,7 +157,7 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBatchRequest& request,
// 3. add batch to load channel
// batch may not exist in request(eg: eos request without batch),
// this case will be handled in load channel's add batch method.
- RETURN_IF_ERROR(channel->add_batch(request, tablet_vec));
+ RETURN_IF_ERROR(channel->add_batch(request, response));
// 4. handle finish
if (channel->is_finished()) {
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 450c8bf..1da0ec7 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -51,7 +51,7 @@ public:
Status open(const PTabletWriterOpenRequest& request);
Status add_batch(const PTabletWriterAddBatchRequest& request,
- google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec);
+ PTabletWriterAddBatchResult* response);
// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 8797076..449b4c2 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -368,6 +368,12 @@ public:
std::vector<TTabletCommitInfo>& tablet_commit_infos() { return
_tablet_commit_infos; }
+ const std::vector<TErrorTabletInfo>& error_tablet_infos() const {
+ return _error_tablet_infos;
+ }
+
+ std::vector<TErrorTabletInfo>& error_tablet_infos() { return
_error_tablet_infos; }
+
/// Helper to call QueryState::StartSpilling().
Status StartSpilling(MemTracker* mem_tracker);
@@ -508,6 +514,7 @@ private:
std::unique_ptr<LoadErrorHub> _error_hub;
std::mutex _create_error_hub_lock;
std::vector<TTabletCommitInfo> _tablet_commit_infos;
+ std::vector<TErrorTabletInfo> _error_tablet_infos;
//TODO chenhao , remove this to QueryState
/// Pool of buffer reservations used to distribute initial reservations to
operators
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 68370f6..75dc64f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -51,33 +51,34 @@ TabletsChannel::~TabletsChannel() {
delete _schema;
}
-Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
+Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kOpened) {
// Normal case, already open by other sender
return Status::OK();
}
- LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " <<
params.tablets().size()
- << ", timeout(s): " << params.load_channel_timeout_s();
- _txn_id = params.txn_id();
- _index_id = params.index_id();
+ LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " <<
request.tablets().size()
+ << ", timeout(s): " << request.load_channel_timeout_s();
+ _txn_id = request.txn_id();
+ _index_id = request.index_id();
_schema = new OlapTableSchemaParam();
- RETURN_IF_ERROR(_schema->init(params.schema()));
+ RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();
_row_desc = new RowDescriptor(_tuple_desc, false);
- _num_remaining_senders = params.num_senders();
+ _num_remaining_senders = request.num_senders();
_next_seqs.resize(_num_remaining_senders, 0);
_closed_senders.Reset(_num_remaining_senders);
- RETURN_IF_ERROR(_open_all_writers(params));
+ RETURN_IF_ERROR(_open_all_writers(request));
_state = kOpened;
return Status::OK();
}
-Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
- DCHECK(params.tablet_ids_size() == params.row_batch().num_rows());
+Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
+ PTabletWriterAddBatchResult* response) {
+ DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
int64_t cur_seq;
{
std::lock_guard<std::mutex> l(_lock);
@@ -87,23 +88,27 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBatchRequest& params) {
: Status::InternalError(strings::Substitute("TabletsChannel $0
state: $1",
_key.to_string(), _state));
}
- cur_seq = _next_seqs[params.sender_id()];
+ cur_seq = _next_seqs[request.sender_id()];
// check packet
- if (params.packet_seq() < cur_seq) {
+ if (request.packet_seq() < cur_seq) {
LOG(INFO) << "packet has already recept before, expect_seq=" <<
cur_seq
- << ", recept_seq=" << params.packet_seq();
+ << ", recept_seq=" << request.packet_seq();
return Status::OK();
- } else if (params.packet_seq() > cur_seq) {
+ } else if (request.packet_seq() > cur_seq) {
LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
- << ", recept_seq=" << params.packet_seq();
+ << ", recept_seq=" << request.packet_seq();
return Status::InternalError("lost data packet");
}
}
- RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
+ RowBatch row_batch(*_row_desc, request.row_batch(), _mem_tracker.get());
std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index
*/> tablet_to_rowidxs;
- for (int i = 0; i < params.tablet_ids_size(); ++i) {
- int64_t tablet_id = params.tablet_ids(i);
+ for (int i = 0; i < request.tablet_ids_size(); ++i) {
+ int64_t tablet_id = request.tablet_ids(i);
+ if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) {
+ // skip broken tablets
+ continue;
+ }
auto it = tablet_to_rowidxs.find(tablet_id);
if (it == tablet_to_rowidxs.end()) {
tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i
});
@@ -112,6 +117,7 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBatchRequest& params) {
}
}
+ google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
response->mutable_tablet_errors();
for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
auto tablet_writer_it =
_tablet_writers.find(tablet_to_rowidxs_it.first);
if (tablet_writer_it == _tablet_writers.end()) {
@@ -125,13 +131,18 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBatchRequest& params) {
"tablet writer write failed, tablet_id=$0, txn_id=$1,
err=$2",
tablet_to_rowidxs_it.first, _txn_id, st);
LOG(WARNING) << err_msg;
- return Status::InternalError(err_msg);
+ PTabletError* error = tablet_errors->Add();
+ error->set_tablet_id(tablet_to_rowidxs_it.first);
+ error->set_msg(err_msg);
+ _broken_tablets.insert(tablet_to_rowidxs_it.first);
+ // continue write to other tablet.
+ // the error will return back to sender.
}
}
{
std::lock_guard<std::mutex> l(_lock);
- _next_seqs[params.sender_id()] = cur_seq + 1;
+ _next_seqs[request.sender_id()] = cur_seq + 1;
}
return Status::OK();
}
@@ -186,7 +197,7 @@ Status TabletsChannel::close(int sender_id, int64_t
backend_id, bool* finished,
for (auto writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE
judge it.
- writer->close_wait(tablet_vec);
+ writer->close_wait(tablet_vec,
(_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
}
// TODO(gaodayue) clear and destruct all delta writers to make sure
all memory are freed
// DCHECK_EQ(_mem_tracker->consumption(), 0);
@@ -249,7 +260,7 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
return Status::OK();
}
-Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest&
params) {
+Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest&
request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (auto& index : _schema->indexes()) {
@@ -264,21 +275,21 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& params)
ss << "unknown index id, key=" << _key;
return Status::InternalError(ss.str());
}
- for (auto& tablet : params.tablets()) {
- WriteRequest request;
- request.tablet_id = tablet.tablet_id();
- request.schema_hash = schema_hash;
- request.write_type = WriteType::LOAD;
- request.txn_id = _txn_id;
- request.partition_id = tablet.partition_id();
- request.load_id = params.id();
- request.need_gen_rollup = params.need_gen_rollup();
- request.tuple_desc = _tuple_desc;
- request.slots = index_slots;
- request.is_high_priority = _is_high_priority;
+ for (auto& tablet : request.tablets()) {
+ WriteRequest wrequest;
+ wrequest.tablet_id = tablet.tablet_id();
+ wrequest.schema_hash = schema_hash;
+ wrequest.write_type = WriteType::LOAD;
+ wrequest.txn_id = _txn_id;
+ wrequest.partition_id = tablet.partition_id();
+ wrequest.load_id = request.id();
+ wrequest.need_gen_rollup = request.need_gen_rollup();
+ wrequest.tuple_desc = _tuple_desc;
+ wrequest.slots = index_slots;
+ wrequest.is_high_priority = _is_high_priority;
DeltaWriter* writer = nullptr;
- auto st = DeltaWriter::open(&request, _mem_tracker, &writer);
+ auto st = DeltaWriter::open(&wrequest, _mem_tracker, &writer);
if (st != OLAP_SUCCESS) {
std::stringstream ss;
ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
@@ -290,7 +301,7 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& params)
_tablet_writers.emplace(tablet.tablet_id(), writer);
}
_s_tablet_writer_count += _tablet_writers.size();
- DCHECK_EQ(_tablet_writers.size(), params.tablets_size());
+ DCHECK_EQ(_tablet_writers.size(), request.tablets_size());
return Status::OK();
}
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 11144cb..e99ac62 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -58,10 +58,10 @@ public:
~TabletsChannel();
- Status open(const PTabletWriterOpenRequest& params);
+ Status open(const PTabletWriterOpenRequest& request);
// no-op when this channel has been closed or cancelled
- Status add_batch(const PTabletWriterAddBatchRequest& batch);
+ Status add_batch(const PTabletWriterAddBatchRequest& request,
PTabletWriterAddBatchResult* response);
// Mark sender with 'sender_id' as closed.
// If all senders are closed, close this channel, set '*finished' to true,
update 'tablet_vec'
@@ -84,7 +84,7 @@ public:
private:
// open all writer
- Status _open_all_writers(const PTabletWriterOpenRequest& params);
+ Status _open_all_writers(const PTabletWriterOpenRequest& request);
private:
// id of this load channel
@@ -118,6 +118,10 @@ private:
// tablet_id -> TabletChannel
std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
+ // broken tablet ids.
+ // If a tablet write fails, it's id will be added to this set.
+ // So that following batch will not handle this tablet anymore.
+ std::unordered_set<int64_t> _broken_tablets;
std::unordered_set<int64_t> _partition_ids;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 4106963..a948db8 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -137,8 +137,7 @@ void
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
SCOPED_RAW_TIMER(&execution_time_ns);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request,
cntl);
- auto st = _exec_env->load_channel_mgr()->add_batch(*request,
-
response->mutable_tablet_vec());
+ auto st = _exec_env->load_channel_mgr()->add_batch(*request,
response);
if (!st.ok()) {
LOG(WARNING) << "tablet writer add batch failed, message=" <<
st.get_error_msg()
<< ", id=" << request->id() << ", index_id=" <<
request->index_id()
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 8b0f100..baf3b04 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -109,7 +109,7 @@ Status VOlapTableSink::send(RuntimeState* state,
vectorized::Block* input_block)
uint32_t tablet_index = dist_hash % partition->num_buckets;
for (int j = 0; j < partition->indexes.size(); ++j) {
int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
- RETURN_IF_ERROR(_channels[j]->add_row(block_row, tablet_id));
+ _channels[j]->add_row(block_row, tablet_id);
_number_output_rows++;
}
}
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index a0573b5..c4eb7f9 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -376,7 +376,7 @@ TEST_F(TestDeltaWriter, open) {
ASSERT_NE(delta_writer, nullptr);
res = delta_writer->close();
ASSERT_EQ(OLAP_SUCCESS, res);
- res = delta_writer->close_wait(nullptr);
+ res = delta_writer->close_wait(nullptr, false);
ASSERT_EQ(OLAP_SUCCESS, res);
SAFE_DELETE(delta_writer);
@@ -472,7 +472,7 @@ TEST_F(TestDeltaWriter, write) {
res = delta_writer->close();
ASSERT_EQ(OLAP_SUCCESS, res);
- res = delta_writer->close_wait(nullptr);
+ res = delta_writer->close_wait(nullptr, false);
ASSERT_EQ(OLAP_SUCCESS, res);
// publish version success
@@ -552,7 +552,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
res = delta_writer->close();
ASSERT_EQ(OLAP_SUCCESS, res);
- res = delta_writer->close_wait(nullptr);
+ res = delta_writer->close_wait(nullptr, false);
ASSERT_EQ(OLAP_SUCCESS, res);
// publish version success
diff --git a/be/test/runtime/load_channel_mgr_test.cpp
b/be/test/runtime/load_channel_mgr_test.cpp
index f8b2374..5664f45 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -85,7 +85,7 @@ OLAPStatus DeltaWriter::close() {
return OLAP_SUCCESS;
}
-OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec) {
+OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec, bool is_broken) {
return close_status;
}
@@ -257,8 +257,9 @@ TEST_F(LoadChannelMgrTest, normal) {
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec);
+ // google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
+ PTabletWriterAddBatchResult response;
+ auto st = mgr.add_batch(request, &response);
request.release_id();
ASSERT_TRUE(st.ok());
}
@@ -423,11 +424,14 @@ TEST_F(LoadChannelMgrTest, add_failed) {
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
+ // DeltaWriter's write will return -215
add_status = OLAP_ERR_TABLE_NOT_FOUND;
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec);
+ PTabletWriterAddBatchResult response;
+ auto st = mgr.add_batch(request, &response);
request.release_id();
- ASSERT_FALSE(st.ok());
+ // st is still ok.
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ(2, response.tablet_errors().size());
}
}
@@ -514,12 +518,12 @@ TEST_F(LoadChannelMgrTest, close_failed) {
}
row_batch.serialize(request.mutable_row_batch());
close_status = OLAP_ERR_TABLE_NOT_FOUND;
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec);
+ PTabletWriterAddBatchResult response;
+ auto st = mgr.add_batch(request, &response);
request.release_id();
// even if delta close failed, the return status is still ok, but
tablet_vec is empty
ASSERT_TRUE(st.ok());
- ASSERT_TRUE(tablet_vec.empty());
+ ASSERT_TRUE(response.tablet_vec().empty());
}
}
@@ -602,8 +606,8 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) {
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec);
+ PTabletWriterAddBatchResult response;
+ auto st = mgr.add_batch(request, &response);
request.release_id();
ASSERT_FALSE(st.ok());
}
@@ -688,11 +692,11 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
row_batch.commit_last_row();
}
row_batch.serialize(request.mutable_row_batch());
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec1;
- auto st = mgr.add_batch(request, &tablet_vec1);
+ PTabletWriterAddBatchResult response;
+ auto st = mgr.add_batch(request, &response);
ASSERT_TRUE(st.ok());
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec2;
- st = mgr.add_batch(request, &tablet_vec2);
+ PTabletWriterAddBatchResult response2;
+ st = mgr.add_batch(request, &response2);
request.release_id();
ASSERT_TRUE(st.ok());
}
@@ -704,8 +708,8 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
request.set_sender_id(0);
request.set_eos(true);
request.set_packet_seq(0);
- google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec);
+ PTabletWriterAddBatchResult response;
+ auto st = mgr.add_batch(request, &response);
request.release_id();
ASSERT_TRUE(st.ok());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
index f1a9c56..26658a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
@@ -35,7 +35,7 @@ public class LoadProcDir implements ProcDirInterface {
.add("JobId").add("Label").add("State").add("Progress")
.add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime")
.add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime")
- .add("URL").add("JobDetails")
+
.add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets")
.build();
// label and state column index of result
@@ -65,10 +65,10 @@ public class LoadProcDir implements ProcDirInterface {
// merge load job from load and loadManager
LinkedList<List<Comparable>> loadJobInfos =
load.getLoadJobInfosByDb(db.getId(), db.getFullName(),
-
null, false, null);
+ null, false, null);
loadJobInfos.addAll(Catalog.getCurrentCatalog().getLoadManager().getLoadJobInfosByDb(db.getId(),
null,
-
false,
-
null));
+ false,
+ null));
int counter = 0;
Iterator<List<Comparable>> iterator =
loadJobInfos.descendingIterator();
while (iterator.hasNext()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
index 532a6b5..076e1f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/ldap/LdapAuthenticate.java
@@ -29,8 +29,8 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.List;
@@ -73,7 +73,7 @@ public class LdapAuthenticate {
try {
if (!LdapClient.checkPassword(userName, password)) {
LOG.debug("user:{} use error LDAP password.", userName);
- ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR,
qualifiedUser, usePasswd);
+ ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR,
qualifiedUser, context.getRemoteIP(), usePasswd);
return false;
}
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index c0290f1..6f03f4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -220,7 +220,7 @@ public class BrokerLoadJob extends BulkLoadJob {
txnState.addTableIndexes(table);
}
} finally {
- MetaLockUtils.readUnlockTables(tableList);
+ MetaLockUtils.readUnlockTables(tableList);
}
// Submit task outside the database lock, cause it may take a while if
task queue is full.
for (LoadTask loadTask : newLoadingTasks) {
@@ -342,6 +342,8 @@ public class BrokerLoadJob extends BulkLoadJob {
loadingStatus.setTrackingUrl(attachment.getTrackingUrl());
}
commitInfos.addAll(attachment.getCommitInfoList());
+ errorTabletInfos.addAll(attachment.getErrorTabletInfos());
+
progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() *
100);
if (progress == 100) {
progress = 99;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
index 0a6acd9..7aefd33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
import java.util.List;
@@ -27,13 +28,16 @@ public class BrokerLoadingTaskAttachment extends
TaskAttachment {
private Map<String, String> counters;
private String trackingUrl;
private List<TabletCommitInfo> commitInfoList;
+ List<ErrorTabletInfo> errorTabletInfos;
public BrokerLoadingTaskAttachment(long taskId, Map<String, String>
counters, String trackingUrl,
- List<TabletCommitInfo> commitInfoList) {
+ List<TabletCommitInfo> commitInfoList,
+ List<ErrorTabletInfo> errorTabletInfos)
{
super(taskId);
this.trackingUrl = trackingUrl;
this.counters = counters;
this.commitInfoList = commitInfoList;
+ this.errorTabletInfos = errorTabletInfos;
}
public String getCounter(String key) {
@@ -47,4 +51,8 @@ public class BrokerLoadingTaskAttachment extends
TaskAttachment {
public List<TabletCommitInfo> getCommitInfoList() {
return commitInfoList;
}
+
+ public List<ErrorTabletInfo> getErrorTabletInfos() {
+ return errorTabletInfos;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 5992bed..756f6ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -56,6 +56,7 @@ import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
@@ -69,6 +70,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
@@ -129,6 +131,8 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
// only for persistence param. see readFields() for usage
private boolean isJobTypeRead = false;
+ protected List<ErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
+
public static class LoadStatistic {
// number of rows processed on BE, this number will be updated
periodically by query report.
// A load job may has several load tasks(queries), and each task has
several fragments.
@@ -773,12 +777,23 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
// tracking url
jobInfo.add(loadingStatus.getTrackingUrl());
jobInfo.add(loadStatistic.toJson());
+ // transaction id
+ jobInfo.add(transactionId);
+ // error tablets
+ jobInfo.add(errorTabletsToJson());
return jobInfo;
} finally {
readUnlock();
}
}
+ public String errorTabletsToJson() {
+ Map<Long, String> map = Maps.newHashMap();
+ errorTabletInfos.stream().limit(3).forEach(p ->
map.put(p.getTabletId(), p.getMsg()));
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ return gson.toJson(map);
+ }
+
protected String getResourceName() {
return "N/A";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index dc6c371..6ff10fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -36,6 +36,7 @@ import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.logging.log4j.LogManager;
@@ -109,7 +110,7 @@ public class LoadLoadingTask extends LoadTask {
}
@Override
- protected void executeTask() throws Exception{
+ protected void executeTask() throws Exception {
LOG.info("begin to execute loading task. load id: {} job id: {}. db:
{}, tbl: {}. left retry: {}",
DebugUtil.printId(loadId), callback.getCallbackId(),
db.getFullName(), table.getName(), retryTime);
retryTime--;
@@ -127,8 +128,8 @@ public class LoadLoadingTask extends LoadTask {
/*
* For broker load job, user only need to set mem limit by
'exec_mem_limit' property.
* And the variable 'load_mem_limit' does not make any effect.
- * However, in order to ensure the consistency of semantics when
executing on the BE side,
- * and to prevent subsequent modification from incorrectly setting the
load_mem_limit,
+ * However, in order to ensure the consistency of semantics when
executing on the BE side,
+ * and to prevent subsequent modification from incorrectly setting the
load_mem_limit,
* here we use exec_mem_limit to directly override the load_mem_limit
property.
*/
curCoordinator.setLoadMemLimit(execMemLimit);
@@ -150,10 +151,10 @@ public class LoadLoadingTask extends LoadTask {
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
- .add("task_id", signature)
- .add("query_id",
DebugUtil.printId(curCoordinator.getQueryId()))
- .add("msg", "begin to execute plan")
- .build());
+ .add("task_id", signature)
+ .add("query_id",
DebugUtil.printId(curCoordinator.getQueryId()))
+ .add("msg", "begin to execute plan")
+ .build());
}
curCoordinator.exec();
if (curCoordinator.join(waitSecond)) {
@@ -162,7 +163,8 @@ public class LoadLoadingTask extends LoadTask {
attachment = new BrokerLoadingTaskAttachment(signature,
curCoordinator.getLoadCounters(),
curCoordinator.getTrackingUrl(),
-
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()));
+
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()),
+
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()));
// Create profile of this task and add to the job profile.
createProfile(curCoordinator);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 7d4ad46..6e8c66c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -280,7 +280,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// If user does not specify kafka partition,
// We will fetch partition from kafka server periodically
if (this.state == JobState.RUNNING || this.state ==
JobState.NEED_SCHEDULE) {
- if (customKafkaPartitions == null &&
!customKafkaPartitions.isEmpty()) {
+ if (customKafkaPartitions != null &&
!customKafkaPartitions.isEmpty()) {
return;
}
updateKafkaPartitions();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
index ae55381..b323600 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
@@ -56,7 +56,7 @@ public class MysqlProto {
List<UserIdentity> currentUserIdentity = Lists.newArrayList();
if
(!Catalog.getCurrentCatalog().getAuth().checkPassword(qualifiedUser, remoteIp,
scramble, randomString, currentUserIdentity)) {
- ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR,
qualifiedUser, usePasswd);
+ ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR,
qualifiedUser, context.getRemoteIP(), usePasswd);
return false;
}
@@ -70,7 +70,7 @@ public class MysqlProto {
String tmpUser = user;
if (tmpUser == null || tmpUser.isEmpty()) {
- ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, "no user",
usePasswd);
+ ErrorReport.report(ErrorCode.ERR_ACCESS_DENIED_ERROR, "anonym@" +
context.getRemoteIP(), usePasswd);
return null;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 4290dd1..4afcabc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -520,6 +520,10 @@ public class ConnectContext {
return currentConnectedFEIp;
}
+ public String getRemoteIp() {
+ return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp();
+ }
+
public class ThreadInfo {
public List<String> toRow(long nowMs) {
List<String> row = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 51d5dea..faee830 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -470,19 +470,6 @@ public class ConnectProcessor {
}
ctx.setThreadLocalInfo();
-
- if (ctx.getCurrentUserIdentity() == null) {
- // if we upgrade Master FE first, the request from old FE does not
set "current_user_ident".
- // so ctx.getCurrentUserIdentity() will get null, and causing
NullPointerException after using it.
- // return error directly.
- TMasterOpResult result = new TMasterOpResult();
- ctx.getState().setError(ErrorCode.ERR_COMMON_ERROR, "Missing
current user identity. You need to upgrade this Frontend " +
- "to the same version as Master Frontend.");
-
result.setMaxJournalId(Catalog.getCurrentCatalog().getMaxJournalId().longValue());
- result.setPacket(getResultPacket());
- return result;
- }
-
StmtExecutor executor = null;
try {
// 0 for compatibility.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 87d9c2b..407bbea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -65,6 +65,7 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TLoadErrorHubInfo;
@@ -198,6 +199,7 @@ public class Coordinator {
private List<String> exportFiles;
private List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
+ private List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
// Input parameter
private long jobId = -1; // job which this task belongs to
@@ -362,6 +364,10 @@ public class Coordinator {
return commitInfos;
}
+ public List<TErrorTabletInfo> getErrorTabletInfos() {
+ return errorTabletInfos;
+ }
+
// Initialize
private void prepare() {
for (PlanFragment fragment : fragments) {
@@ -472,11 +478,11 @@ public class Coordinator {
&& ((ResultFileSink) topDataSink).getStorageType() ==
StorageBackend.StorageType.BROKER) {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink)
topDataSink;
- FsBroker broker =
Catalog.getCurrentCatalog().getBrokerMgr()
- .getBroker(topResultFileSink.getBrokerName(),
execBeAddr.getHostname());
+ FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr()
+ .getBroker(topResultFileSink.getBrokerName(),
execBeAddr.getHostname());
topResultFileSink.setBrokerAddr(broker.ip, broker.port);
}
- } else {
+ } else {
// This is a load process.
this.queryOptions.setIsReportSuccess(true);
deltaUrls = Lists.newArrayList();
@@ -704,6 +710,15 @@ public class Coordinator {
}
}
+ private void updateErrorTabletInfos(List<TErrorTabletInfo>
errorTabletInfos) {
+ lock.lock();
+ try {
+ this.errorTabletInfos.addAll(errorTabletInfos);
+ } finally {
+ lock.unlock();
+ }
+ }
+
private void updateStatus(Status status, TUniqueId instanceId) {
lock.lock();
try {
@@ -1193,10 +1208,10 @@ public class Coordinator {
// Traverse the expected runtimeFilterID in each fragment, and establish
the corresponding relationship
// between runtimeFilterID and fragment instance addr and select the merge
instance of runtimeFilter
private void assignRuntimeFilterAddr() throws Exception {
- for (PlanFragment fragment: fragments) {
+ for (PlanFragment fragment : fragments) {
FragmentExecParams params =
fragmentExecParamsMap.get(fragment.getFragmentId());
// Transform <fragment, runtimeFilterId> to <runtimeFilterId,
fragment>
- for (RuntimeFilterId rid: fragment.getTargetRuntimeFilterIds()) {
+ for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) {
List<FRuntimeFilterTargetParam> targetFragments =
ridToTargetParam.computeIfAbsent(rid, k -> new
ArrayList<>());
for (final FInstanceExecParam instance :
params.instanceExecParams) {
@@ -1204,7 +1219,7 @@ public class Coordinator {
}
}
- for (RuntimeFilterId rid: fragment.getBuilderRuntimeFilterIds()) {
+ for (RuntimeFilterId rid : fragment.getBuilderRuntimeFilterIds()) {
ridToBuilderNum.merge(rid, params.instanceExecParams.size(),
Integer::sum);
}
}
@@ -1262,7 +1277,7 @@ public class Coordinator {
// weather we can overwrite the first parameter or not?
private List<TScanRangeParams> findOrInsert(Map<Integer,
List<TScanRangeParams>> m, Integer key,
- ArrayList<TScanRangeParams> defaultVal) {
+ ArrayList<TScanRangeParams>
defaultVal) {
List<TScanRangeParams> value = m.get(key);
if (value == null) {
m.put(key, defaultVal);
@@ -1419,8 +1434,8 @@ public class Coordinator {
}
public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations
seqLocation,
- HashMap<TNetworkAddress, Long>
assignedBytesPerHost,
- Reference<Long> backendIdRef) throws
UserException {
+
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
+ Reference<Long>
backendIdRef) throws UserException {
if (!Config.enable_local_replica_selection) {
return selectBackendsByRoundRobin(seqLocation.getLocations(),
assignedBytesPerHost, backendIdRef);
}
@@ -1541,6 +1556,9 @@ public class Coordinator {
if (params.isSetCommitInfos()) {
updateCommitInfos(params.getCommitInfos());
}
+ if (params.isSetErrorTabletInfos()) {
+ updateErrorTabletInfos(params.getErrorTabletInfos());
+ }
profileDoneSignal.markedCountDown(params.getFragmentInstanceId(),
-1L);
}
@@ -1706,7 +1724,7 @@ public class Coordinator {
// make sure each host have average bucket to scan
private void
getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation,
PlanFragmentId fragmentId, Integer bucketSeq,
- ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress,
Long> addressToBackendID) throws Exception {
+
ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long>
addressToBackendID) throws Exception {
Map<Long, Integer> buckendIdToBucketCountMap =
fragmentIdToBuckendIdBucketCountMap.get(fragmentId);
int maxBucketNum = Integer.MAX_VALUE;
long buckendId = Long.MAX_VALUE;
@@ -1860,7 +1878,7 @@ public class Coordinator {
long lastMissingHeartbeatTime = -1;
public BackendExecState(PlanFragmentId fragmentId, int instanceId, int
profileFragmentId,
- TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long>
addressToBackendID) {
+ TExecPlanFragmentParams rpcParams,
Map<TNetworkAddress, Long> addressToBackendID) {
this.profileFragmentId = profileFragmentId;
this.fragmentId = fragmentId;
this.instanceId = instanceId;
@@ -2076,18 +2094,18 @@ public class Coordinator {
params.params.setRuntimeFilterParams(new
TRuntimeFilterParams());
params.params.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
- for (Map.Entry<RuntimeFilterId,
List<FRuntimeFilterTargetParam>> entry: ridToTargetParam.entrySet()) {
+ for (Map.Entry<RuntimeFilterId,
List<FRuntimeFilterTargetParam>> entry : ridToTargetParam.entrySet()) {
List<TRuntimeFilterTargetParams> targetParams =
Lists.newArrayList();
- for (FRuntimeFilterTargetParam targetParam:
entry.getValue()) {
+ for (FRuntimeFilterTargetParam targetParam :
entry.getValue()) {
targetParams.add(new
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
targetParam.targetFragmentInstanceAddr));
}
params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(),
targetParams);
}
- for (Map.Entry<RuntimeFilterId, Integer> entry:
ridToBuilderNum.entrySet()) {
+ for (Map.Entry<RuntimeFilterId, Integer> entry :
ridToBuilderNum.entrySet()) {
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(entry.getKey().asInt(),
entry.getValue());
}
- for (RuntimeFilter rf: assignedRuntimeFilters) {
+ for (RuntimeFilter rf : assignedRuntimeFilters) {
params.params.runtime_filter_params.putToRidToRuntimeFilter(rf.getFilterId().asInt(),
rf.toThrift());
}
}
@@ -2185,7 +2203,7 @@ public class Coordinator {
}
public FInstanceExecParam(TUniqueId id, TNetworkAddress host,
- int perFragmentInstanceIdx, FragmentExecParams
fragmentExecParams) {
+ int perFragmentInstanceIdx,
FragmentExecParams fragmentExecParams) {
this.instanceId = id;
this.host = host;
this.perFragmentInstanceIdx = perFragmentInstanceIdx;
@@ -2229,7 +2247,8 @@ public class Coordinator {
// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
- public TUniqueId targetFragmentInstanceId;;
+ public TUniqueId targetFragmentInstanceId;
+ ;
public TNetworkAddress targetFragmentInstanceAddr;
public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/ErrorTabletInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/ErrorTabletInfo.java
new file mode 100644
index 0000000..6410888
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/ErrorTabletInfo.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.thrift.TErrorTabletInfo;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class ErrorTabletInfo {
+ private long tabletId;
+ private String msg;
+
+ public ErrorTabletInfo(long tabletId, String msg) {
+ this.tabletId = tabletId;
+ this.msg = msg;
+ }
+
+ public long getTabletId() {
+ return tabletId;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public static List<ErrorTabletInfo> fromThrift(List<TErrorTabletInfo>
errorTabletInfos) {
+ List<ErrorTabletInfo> errorInfos = Lists.newArrayList();
+ for (TErrorTabletInfo tErrorInfo : errorTabletInfos) {
+ errorInfos.add(new ErrorTabletInfo(tErrorInfo.getTabletId(),
tErrorInfo.getMsg()));
+ }
+ return errorInfos;
+ }
+}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 7de412d..b78a64e 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -105,12 +105,18 @@ message PTabletWriterAddBatchRequest {
optional bool is_high_priority = 11 [default = false];
};
+message PTabletError {
+ optional int64 tablet_id = 1;
+ optional string msg = 2;
+}
+
message PTabletWriterAddBatchResult {
required PStatus status = 1;
repeated PTabletInfo tablet_vec = 2;
optional int64 execution_time_us = 3;
optional int64 wait_lock_time_us = 4;
optional int64 wait_execution_time_us = 5;
+ repeated PTabletError tablet_errors = 6;
};
// tablet writer cancel
diff --git a/gensrc/script/gen_build_version.sh
b/gensrc/script/gen_build_version.sh
index b9a3f1a..2937cdb 100755
--- a/gensrc/script/gen_build_version.sh
+++ b/gensrc/script/gen_build_version.sh
@@ -25,7 +25,7 @@
# contains the build version based on the git hash or svn revision.
##############################################################
-build_version="pre-0.15.0"
+build_version="trunk"
unset LANG
unset LC_CTYPE
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 1b5cade..2818196 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -400,6 +400,8 @@ struct TReportExecStatusParams {
16: optional i64 backend_id
17: optional i64 loaded_bytes
+
+ 18: optional list<Types.TErrorTabletInfo> errorTabletInfos
}
struct TFeResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index c69f0d9..67a40c1 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -408,6 +408,11 @@ struct TTabletCommitInfo {
2: required i64 backendId
}
+struct TErrorTabletInfo {
+ 1: optional i64 tabletId
+ 2: optional string msg
+}
+
enum TLoadType {
MANUL_LOAD,
ROUTINE_LOAD,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]