This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch stream-load-vec
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/stream-load-vec by this push:
new 687dc8a643 [Bug] Fix some bug in vec stream load (#9094)
687dc8a643 is described below
commit 687dc8a643c4b6e3d5a2a8e5f8e60848de094a68
Author: HappenLee <[email protected]>
AuthorDate: Thu Apr 21 10:59:59 2022 +0800
[Bug] Fix some bug in vec stream load (#9094)
1. mem leak in agg data in memtable
2. core dump in replace function
3. core dump in DCHECK in tablet sink
Co-authored-by: lihaopeng <[email protected]>
---
be/src/exec/tablet_sink.h | 3 -
be/src/olap/memtable.cpp | 10 +--
be/src/olap/memtable.h | 8 +-
.../aggregate_function_reader.cpp | 41 +++++----
.../aggregate_function_reader.h | 7 +-
.../aggregate_function_simple_factory.cpp | 4 +-
.../aggregate_function_window.cpp | 96 ++--------------------
.../aggregate_function_window.h | 69 +++++++++++++---
be/src/vec/olap/block_reader.cpp | 2 +-
be/src/vec/sink/vtablet_sink.cpp | 23 ++++--
10 files changed, 120 insertions(+), 143 deletions(-)
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 900f60a6a8..a2edb73b19 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -273,9 +273,6 @@ protected:
// add batches finished means the last rpc has be response, used to check
whether this channel can be closed
std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized
- // TODO(cmy): should be removed
- std::atomic<bool> _last_patch_processed_finished {true}; // reuse for
vectorized
-
bool _eos_is_produced {false}; // only for restricting producer behaviors
std::unique_ptr<RowDescriptor> _row_desc;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 66d6a04fc2..2628b34007 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -28,8 +28,10 @@
#include "util/doris_metrics.h"
#include "vec/core/field.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
namespace doris {
+
MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema*
tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs,
TupleDescriptor* tuple_desc,
KeysType keys_type, RowsetWriter* rowset_writer,
@@ -77,13 +79,7 @@ void MemTable::_init_agg_functions(const vectorized::Block*
block)
->column(cid)
.aggregation();
std::string agg_name =
- TabletColumn::get_string_by_aggregation_type(agg_method);
- if (agg_name=="REPLACE"){
- agg_name = "last_value";
- }else{
- agg_name += "_reader";
- }
-
+ TabletColumn::get_string_by_aggregation_type(agg_method) +
vectorized::AGG_LOAD_SUFFIX;
std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
[](unsigned char c) { return std::tolower(c); });
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 02f7699719..c5f519eeeb 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -28,6 +28,7 @@
#include "vec/core/block.h"
#include "vec/common/string_ref.h"
#include "vec/aggregate_functions/aggregate_function.h"
+
namespace doris {
struct ContiguousRow;
@@ -37,8 +38,6 @@ class SlotDescriptor;
class TabletSchema;
class Tuple;
class TupleDescriptor;
-class RowInBlock;
-class RowInBlockComparator;
class MemTable {
public:
@@ -103,8 +102,11 @@ private:
NullState null_state = is_null ? NullState::IS_NULL :
NullState::NOT_NULL;
return RowCursorCell(ref.data, null_state);
}
+
~RowInBlock() {
- std::vector<vectorized::AggregateDataPtr>().swap(_agg_places);
+ for (auto agg_place : _agg_places) {
+ delete [] agg_place;
+ }
}
};
class RowInBlockComparator {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
index ce78397794..f90515fd5e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp
@@ -20,29 +20,38 @@
namespace doris::vectorized {
// auto spread at nullable condition, null value do not participate aggregate
-void register_aggregate_function_reader(AggregateFunctionSimpleFactory&
factory) {
+void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory&
factory) {
// add a suffix to the function name here to distinguish special functions
of agg reader
- auto register_function_reader = [&](const std::string& name,
- const AggregateFunctionCreator&
creator) {
- factory.register_function(name + agg_reader_suffix, creator, false);
+ auto register_function = [&](const std::string& name,
+ const AggregateFunctionCreator& creator) {
+ factory.register_function(name + AGG_READER_SUFFIX, creator, false);
+ factory.register_function(name + AGG_LOAD_SUFFIX, creator, false);
};
- register_function_reader("sum", create_aggregate_function_sum_reader);
- register_function_reader("max", create_aggregate_function_max);
- register_function_reader("min", create_aggregate_function_min);
- register_function_reader("replace_if_not_null",
create_aggregate_function_replace_if_not_null);
- register_function_reader("bitmap_union",
create_aggregate_function_bitmap_union);
- register_function_reader("hll_union",
create_aggregate_function_HLL_union<false>);
+ register_function("sum", create_aggregate_function_sum_reader);
+ register_function("max", create_aggregate_function_max);
+ register_function("min", create_aggregate_function_min);
+ register_function("bitmap_union", create_aggregate_function_bitmap_union);
+ register_function("hll_union", create_aggregate_function_HLL_union<false>);
}
-void
register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory&
factory) {
- auto register_function_reader = [&](const std::string& name,
- const AggregateFunctionCreator&
creator, bool nullable) {
- factory.register_function(name + agg_reader_suffix, creator, nullable);
+// only replace funtion in load/reader do different agg operation.
+// because Doris can ensure that the data is globally ordered in reader, but
cannot in load
+// 1. reader, get the first value of input data.
+// 2. load, get the last value of input data.
+void
register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory&
factory) {
+ auto register_function = [&](const std::string& name, const std::string&
suffix,
+ const AggregateFunctionCreator& creator, bool
nullable) {
+ factory.register_function(name + suffix, creator, nullable);
};
- register_function_reader("replace", create_aggregate_function_replace,
false);
- register_function_reader("replace",
create_aggregate_function_replace_nullable, true);
+ register_function("replace", AGG_READER_SUFFIX,
create_aggregate_function_first<false, true>, false);
+ register_function("replace", AGG_READER_SUFFIX,
create_aggregate_function_first<true, true>, true);
+ register_function("replace", AGG_LOAD_SUFFIX,
create_aggregate_function_last<false, true>, false);
+ register_function("replace", AGG_LOAD_SUFFIX,
create_aggregate_function_last<true, true>, true);
+
+ register_function("replace_if_not_null", AGG_READER_SUFFIX,
create_aggregate_function_first<false, true>, false);
+ register_function("replace_if_not_null", AGG_LOAD_SUFFIX,
create_aggregate_function_last<false, true>, false);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.h
b/be/src/vec/aggregate_functions/aggregate_function_reader.h
index f44be5ee57..86fea6f079 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_reader.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_reader.h
@@ -26,10 +26,11 @@
namespace doris::vectorized {
-static const std::string agg_reader_suffix = "_reader";
+static auto constexpr AGG_READER_SUFFIX = "_reader";
+static auto constexpr AGG_LOAD_SUFFIX = "_load";
-void register_aggregate_function_reader(AggregateFunctionSimpleFactory&
factory);
+void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory&
factory);
-void
register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory&
factory);
+void
register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory&
factory);
} // namespace doris::vectorized
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
index fcf333c1bd..799f3cbb6a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
@@ -56,7 +56,7 @@ AggregateFunctionSimpleFactory&
AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_uniq(instance);
register_aggregate_function_bitmap(instance);
register_aggregate_function_combinator_distinct(instance);
- register_aggregate_function_reader(instance); // register aggregate
function for agg reader
+ register_aggregate_function_reader_load(instance); // register
aggregate function for agg reader
register_aggregate_function_window_rank(instance);
register_aggregate_function_stddev_variance_pop(instance);
register_aggregate_function_topn(instance);
@@ -70,7 +70,7 @@ AggregateFunctionSimpleFactory&
AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_combinator_null(instance);
register_aggregate_function_stddev_variance_samp(instance);
- register_aggregate_function_reader_no_spread(instance);
+ register_aggregate_function_replace_reader_load(instance);
register_aggregate_function_window_lead_lag(instance);
register_aggregate_function_HLL_union_agg(instance);
register_aggregate_function_percentile_approx(instance);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
index d4b7f99121..53a4c4931c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp
@@ -23,7 +23,7 @@
#include "common/logging.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/aggregate_functions/factory_helpers.h"
-#include "vec/aggregate_functions/helpers.h"
+
namespace doris::vectorized {
AggregateFunctionPtr create_aggregate_function_dense_rank(const std::string&
name,
@@ -53,44 +53,6 @@ AggregateFunctionPtr
create_aggregate_function_row_number(const std::string& nam
return std::make_shared<WindowFunctionRowNumber>(argument_types);
}
-template <template <typename> class AggregateFunctionTemplate, template
<typename> class Data,
- bool is_nullable, bool is_copy = false>
-static IAggregateFunction* create_function_single_value(const String& name,
- const DataTypes&
argument_types,
- const Array&
parameters) {
- using StoreType = CopiedValue;
-
- assert_arity_at_most<3>(name, argument_types);
-
- auto type = argument_types[0].get();
- if (type->is_nullable()) {
- type = assert_cast<const
DataTypeNullable*>(type)->get_nested_type().get();
- }
- WhichDataType which(*type);
-
-#define DISPATCH(TYPE) \
- if (which.idx == TypeIndex::TYPE) \
- return new AggregateFunctionTemplate< \
- Data<LeadAndLagData<TYPE, is_nullable, false,
StoreType>>>(argument_types);
- FOR_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
-
- if (which.is_decimal()) {
- return new AggregateFunctionTemplate<
- Data<LeadAndLagData<Int128, is_nullable, false,
StoreType>>>(argument_types);
- }
- if (which.is_date_or_datetime()) {
- return new AggregateFunctionTemplate<
- Data<LeadAndLagData<Int64, is_nullable, false,
StoreType>>>(argument_types);
- }
- if (which.is_string_or_fixed_string()) {
- return new AggregateFunctionTemplate<
- Data<LeadAndLagData<StringRef, is_nullable, true,
StoreType>>>(argument_types);
- }
- DCHECK(false) << "with unknowed type, failed in
create_aggregate_function_leadlag";
- return nullptr;
-}
-
template <bool is_nullable>
AggregateFunctionPtr create_aggregate_function_lag(const std::string& name,
const DataTypes&
argument_types,
@@ -111,53 +73,6 @@ AggregateFunctionPtr create_aggregate_function_lead(const
std::string& name,
name, argument_types, parameters));
}
-template <bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_first(const std::string& name,
- const DataTypes&
argument_types,
- const Array& parameters,
- const bool
result_is_nullable) {
- return AggregateFunctionPtr(
- create_function_single_value<WindowFunctionData,
WindowFunctionFirstData, is_nullable>(
- name, argument_types, parameters));
-}
-
-template <bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_last(const std::string& name,
- const DataTypes&
argument_types,
- const Array& parameters,
- const bool
result_is_nullable) {
- return AggregateFunctionPtr(
- create_function_single_value<WindowFunctionData,
WindowFunctionLastData, is_nullable>(
- name, argument_types, parameters));
-}
-
-AggregateFunctionPtr create_aggregate_function_replace_if_not_null(const
std::string& name,
- const
DataTypes& argument_types,
- const
Array& parameters,
- const bool
result_is_nullable) {
- return AggregateFunctionPtr(
- create_function_single_value<WindowFunctionData,
WindowFunctionFirstData, false, true>(
- name, argument_types, parameters));
-}
-
-AggregateFunctionPtr create_aggregate_function_replace(const std::string& name,
- const DataTypes&
argument_types,
- const Array& parameters,
- const bool
result_is_nullable) {
- return AggregateFunctionPtr(
- create_function_single_value<WindowFunctionData,
WindowFunctionFirstData, false, true>(
- name, argument_types, parameters));
-}
-
-AggregateFunctionPtr create_aggregate_function_replace_nullable(const
std::string& name,
- const
DataTypes& argument_types,
- const Array&
parameters,
- const bool
result_is_nullable) {
- return AggregateFunctionPtr(
- create_function_single_value<WindowFunctionData,
WindowFunctionFirstData, true, true>(
- name, argument_types, parameters));
-}
-
void register_aggregate_function_window_rank(AggregateFunctionSimpleFactory&
factory) {
factory.register_function("dense_rank",
create_aggregate_function_dense_rank);
factory.register_function("rank", create_aggregate_function_rank);
@@ -169,9 +84,10 @@ void
register_aggregate_function_window_lead_lag(AggregateFunctionSimpleFactory&
factory.register_function("lead", create_aggregate_function_lead<true>,
true);
factory.register_function("lag", create_aggregate_function_lag<false>);
factory.register_function("lag", create_aggregate_function_lag<true>,
true);
- factory.register_function("first_value",
create_aggregate_function_first<false>);
- factory.register_function("first_value",
create_aggregate_function_first<true>, true);
- factory.register_function("last_value",
create_aggregate_function_last<false>);
- factory.register_function("last_value",
create_aggregate_function_last<true>, true);
+ factory.register_function("first_value",
create_aggregate_function_first<false, false>);
+ factory.register_function("first_value",
create_aggregate_function_first<true, false>, true);
+ factory.register_function("last_value",
create_aggregate_function_last<false, false>);
+ factory.register_function("last_value",
create_aggregate_function_last<true, false>, true);
}
+
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 133efe7ea0..c438cd3582 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -21,11 +21,13 @@
#pragma once
#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/helpers.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/io/io_helper.h"
+#include "factory_helpers.h"
namespace doris::vectorized {
@@ -405,19 +407,62 @@ private:
DataTypePtr _argument_type;
};
-AggregateFunctionPtr create_aggregate_function_replace_if_not_null(const
std::string& name,
- const
DataTypes& argument_types,
- const
Array& parameters,
- const bool
result_is_nullable);
+template <template <typename> class AggregateFunctionTemplate, template
<typename> class Data,
+ bool is_nullable, bool is_copy = false>
+static IAggregateFunction* create_function_single_value(const String& name,
+ const DataTypes&
argument_types,
+ const Array&
parameters) {
+ using StoreType = std::conditional_t<is_copy, CopiedValue, Value>;
-AggregateFunctionPtr create_aggregate_function_replace(const std::string& name,
- const DataTypes&
argument_types,
- const Array& parameters,
- const bool
result_is_nullable);
+ assert_arity_at_most<3>(name, argument_types);
-AggregateFunctionPtr create_aggregate_function_replace_nullable(const
std::string& name,
- const
DataTypes& argument_types,
- const Array&
parameters,
- const bool
result_is_nullable);
+ auto type = argument_types[0].get();
+ if (type->is_nullable()) {
+ type = assert_cast<const
DataTypeNullable*>(type)->get_nested_type().get();
+ }
+ WhichDataType which(*type);
+
+#define DISPATCH(TYPE) \
+ if (which.idx == TypeIndex::TYPE) \
+ return new AggregateFunctionTemplate< \
+ Data<LeadAndLagData<TYPE, is_nullable, false,
StoreType>>>(argument_types);
+ FOR_NUMERIC_TYPES(DISPATCH)
+#undef DISPATCH
+
+ if (which.is_decimal()) {
+ return new AggregateFunctionTemplate<
+ Data<LeadAndLagData<Int128, is_nullable, false,
StoreType>>>(argument_types);
+ }
+ if (which.is_date_or_datetime()) {
+ return new AggregateFunctionTemplate<
+ Data<LeadAndLagData<Int64, is_nullable, false,
StoreType>>>(argument_types);
+ }
+ if (which.is_string_or_fixed_string()) {
+ return new AggregateFunctionTemplate<
+ Data<LeadAndLagData<StringRef, is_nullable, true,
StoreType>>>(argument_types);
+ }
+ DCHECK(false) << "with unknowed type, failed in
create_aggregate_function_leadlag";
+ return nullptr;
+}
+
+template <bool is_nullable, bool is_copy>
+AggregateFunctionPtr create_aggregate_function_first(const std::string& name,
+ const DataTypes&
argument_types,
+ const Array& parameters,
+ bool result_is_nullable) {
+ return AggregateFunctionPtr(
+ create_function_single_value<WindowFunctionData,
WindowFunctionFirstData, is_nullable, is_copy>(
+ name, argument_types, parameters));
+}
+
+template <bool is_nullable, bool is_copy>
+AggregateFunctionPtr create_aggregate_function_last(const std::string& name,
+ const DataTypes&
argument_types,
+ const Array& parameters,
+ bool result_is_nullable) {
+ return AggregateFunctionPtr(
+ create_function_single_value<WindowFunctionData,
WindowFunctionLastData, is_nullable, is_copy>(
+ name, argument_types, parameters));
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 1959fc70fa..a24fb4ee77 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -91,7 +91,7 @@ void BlockReader::_init_agg_state(const ReaderParams&
read_params) {
.column(read_params.origin_return_columns->at(_return_columns_loc[idx]))
.aggregation();
std::string agg_name =
- TabletColumn::get_string_by_aggregation_type(agg_method) +
agg_reader_suffix;
+ TabletColumn::get_string_by_aggregation_type(agg_method) +
AGG_READER_SUFFIX;
std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
[](unsigned char c) { return std::tolower(c); });
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 54a361044e..997b619bd5 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -178,18 +178,28 @@ Status VNodeChannel::add_row(BlockRow& block_row, int64_t
tablet_id) {
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token) {
- auto st = none_of({_cancelled, _send_finished});
+ auto st = none_of({_cancelled, _send_finished});
if (!st.ok()) {
return 0;
}
- bool is_finished = true;
- if (!_add_block_closure->is_packet_in_flight() && _pending_batches_num > 0
&&
- _last_patch_processed_finished.compare_exchange_strong(is_finished,
false)) {
+
+ if (!_add_block_closure->try_set_in_flight()) {
+ return _send_finished ? 0 : 1;
+ }
+
+ // We are sure that try_send_batch is not running
+ if (_pending_batches_num > 0) {
auto s = thread_pool_token->submit_func(
- std::bind(&VNodeChannel::try_send_block, this, state));
+ std::bind(&VNodeChannel::try_send_block, this, state));
if (!s.ok()) {
_cancel_with_msg("submit send_batch task to send_batch_thread_pool
failed");
+ // clear in flight
+ _add_block_closure->clear_in_flight();
}
+ // in_flight is cleared in closure::Run
+ } else {
+ // clear in flight
+ _add_block_closure->clear_in_flight();
}
return _send_finished ? 0 : 1;
}
@@ -221,6 +231,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
&compressed_bytes, &_column_values_buffer);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
+ _add_block_closure->clear_in_flight();
return;
}
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
@@ -234,6 +245,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
+ _add_block_closure->clear_in_flight();
return;
} else {
remain_ms = config::min_load_rpc_timeout_ms;
@@ -266,7 +278,6 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
_add_block_closure);
_next_packet_seq++;
- _last_patch_processed_finished = true;
}
void VNodeChannel::_close_check() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]