github-actions[bot] commented on code in PR #27076:
URL: https://github.com/apache/doris/pull/27076#discussion_r1406960544
##########
be/src/io/fs/buffered_reader.cpp:
##########
@@ -453,11 +448,14 @@ void PrefetchBuffer::prefetch_buffer() {
buf_size = merge_small_ranges(_offset, read_range_index);
}
+ _len = 0;
+ Status s;
+
{
SCOPED_RAW_TIMER(&_statis.read_time);
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len,
_io_ctx);
}
- if (UNLIKELY(buf_size != _len)) {
+ if (UNLIKELY(s.ok() && buf_size != _len)) {
Review Comment:
warning: boolean expression can be simplified by DeMorgan's theorem
[readability-simplify-boolean-expr]
```cpp
if (UNLIKELY(s.ok() && buf_size != _len)) {
^
```
<details>
<summary>Additional context</summary>
**be/src/common/compiler_util.h:35:** expanded from macro 'UNLIKELY'
```cpp
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
^
```
</details>
##########
be/src/olap/wal_manager.cpp:
##########
@@ -76,6 +80,78 @@
&_replay_thread);
}
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ LOG(INFO) << "add wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id <<
",status:" << wal_status;
+ auto it = _wal_status_queues.find(table_id);
+ if (it == _wal_status_queues.end()) {
+ std::unordered_map<int64_t, WAL_STATUS> tmp;
+ tmp.emplace(wal_id, wal_status);
+ _wal_status_queues.emplace(table_id, tmp);
+ } else {
+ it->second.emplace(wal_id, wal_status);
+ }
+}
+
+Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
Review Comment:
warning: method 'erase_wal_status_queue' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:64:
```diff
- Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
+ static Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
```
##########
be/src/olap/wal_writer.h:
##########
@@ -18,43 +18,50 @@
#pragma once
#include <atomic>
+#include <condition_variable>
#include <memory>
+#include <mutex>
#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
#include "io/fs/file_reader_writer_fwd.h"
-#include "util/lock.h"
namespace doris {
using PBlockArray = std::vector<PBlock*>;
+extern const char* k_wal_magic;
+extern const uint32_t k_wal_magic_length;
class WalWriter {
public:
explicit WalWriter(const std::string& file_name,
- const std::shared_ptr<std::atomic_size_t>&
all_wal_disk_bytes);
+ const std::shared_ptr<std::atomic_size_t>&
all_wal_disk_bytes,
+ const std::shared_ptr<std::condition_variable>& cv);
~WalWriter();
Status init();
Status finalize();
Status append_blocks(const PBlockArray& blocks);
size_t disk_bytes() const { return
_disk_bytes.load(std::memory_order_relaxed); };
+ Status append_header(uint32_t version, std::string col_ids);
std::string file_name() { return _file_name; };
+
+public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_writer.h:35:** previously declared here
```cpp
public:
^
```
</details>
##########
be/src/pipeline/exec/analytic_source_operator.cpp:
##########
@@ -36,8 +37,129 @@
_agg_functions_size(0),
_agg_functions_created(false) {}
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx,
+
vectorized::BlockRowPos start,
+
vectorized::BlockRowPos end,
+ bool
need_check_first) {
+ auto& shared_state = *_shared_state;
+ int64_t start_init_row_num = start.row_num;
+ vectorized::ColumnPtr start_column =
+
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
+ vectorized::ColumnPtr start_next_block_column = start_column;
+
+ DCHECK_LE(start.block_num, end.block_num);
+ DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1);
+ int64_t start_block_num = start.block_num;
+ int64_t end_block_num = end.block_num;
+ int64_t mid_blcok_num = end.block_num;
+ // To fix this problem: https://github.com/apache/doris/issues/15951
+ // in this case, the partition by column is last row of block, so it's
pointed to a new block at row = 0, range is: [left, right)
+ // From the perspective of order by column, the two values are exactly
equal.
+ // so the range will be get wrong because it's compare_at == 0 with next
block at row = 0
+ if (need_check_first && end.block_num > 0 && end.row_num == 0) {
+ end.block_num--;
+ end_block_num--;
+ end.row_num = shared_state.input_blocks[end_block_num].rows();
+ }
+ //binary search find in which block
+ while (start_block_num < end_block_num) {
+ mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
+ start_next_block_column =
+
shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
+ //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0]
+ if (start_column->compare_at(start_init_row_num, 0,
*start_next_block_column, 1) == 0) {
+ start_block_num = mid_blcok_num;
+ } else {
+ end_block_num = mid_blcok_num - 1;
+ }
+ }
+
+ // have check the start.block_num: start_column[start_init_row_num] with
mid_blcok_num start_next_block_column[0]
+ // now next block must not be result, so need check with end_block_num:
start_next_block_column[last_row]
+ if (end_block_num == mid_blcok_num - 1) {
+ start_next_block_column =
+
shared_state.input_blocks[end_block_num].get_by_position(idx).column;
+ int64_t block_size = shared_state.input_blocks[end_block_num].rows();
+ if ((start_column->compare_at(start_init_row_num, block_size - 1,
*start_next_block_column,
+ 1) == 0)) {
+ start.block_num = end_block_num + 1;
+ start.row_num = 0;
+ return start;
+ }
+ }
+
+ //check whether need get column again, maybe same as first init
+ // if the start_block_num have move to forword, so need update start block
num and compare it from row_num=0
+ if (start_block_num != start.block_num) {
+ start_init_row_num = 0;
+ start.block_num = start_block_num;
+ start_column =
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
+ }
+ //binary search, set start and end pos
+ int64_t start_pos = start_init_row_num;
+ int64_t end_pos = shared_state.input_blocks[start.block_num].rows();
+ //if end_block_num haven't moved, only start_block_num go to the end block
+ //so could use the end.row_num for binary search
+ if (start.block_num == end.block_num) {
+ end_pos = end.row_num;
+ }
+ while (start_pos < end_pos) {
+ int64_t mid_pos = (start_pos + end_pos) >> 1;
+ if (start_column->compare_at(start_init_row_num, mid_pos,
*start_column, 1)) {
+ end_pos = mid_pos;
+ } else {
+ start_pos = mid_pos + 1;
+ }
+ }
+ start.row_num = start_pos; //update row num, return the find end
+ return start;
+}
+
+vectorized::BlockRowPos AnalyticLocalState::_get_partition_by_end() {
+ auto& shared_state = *_shared_state;
+ if (shared_state.current_row_position <
+ shared_state.partition_by_end.pos) { //still have data, return
partition_by_end directly
+ return shared_state.partition_by_end;
+ }
+
+ if (shared_state.partition_by_eq_expr_ctxs.empty() ||
+ (shared_state.input_total_rows == 0)) { //no partition_by, the all
block is end
+ return shared_state.all_block_end;
+ }
+
+ vectorized::BlockRowPos cal_end = shared_state.all_block_end;
+ for (size_t i = 0; i < shared_state.partition_by_eq_expr_ctxs.size();
+ ++i) { //have partition_by, binary search the partiton end
+ cal_end =
_compare_row_to_find_end(shared_state.partition_by_column_idxs[i],
+ shared_state.partition_by_end,
cal_end);
+ }
+ cal_end.pos =
shared_state.input_block_first_row_positions[cal_end.block_num] +
cal_end.row_num;
+ return cal_end;
+}
+
+bool AnalyticLocalState::_whether_need_next_partition(
Review Comment:
warning: method '_whether_need_next_partition' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_source_operator.h:96:
```diff
- bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
+ static bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
```
##########
be/src/agent/cgroup_cpu_ctl.cpp:
##########
@@ -41,6 +41,12 @@ Status CgroupCpuCtl::init() {
return Status::OK();
}
+void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t*
cpu_hard_limit) {
Review Comment:
warning: method 'get_cgroup_cpu_info' can be made const
[readability-make-member-function-const]
```suggestion
void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t*
cpu_hard_limit) const {
```
be/src/agent/cgroup_cpu_ctl.h:50:
```diff
- void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t*
cpu_hard_limit);
+ void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t*
cpu_hard_limit) const;
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -76,6 +80,78 @@ Status WalManager::init() {
&_replay_thread);
}
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
Review Comment:
warning: method 'add_wal_status_queue' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:63:
```diff
- void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS
wal_status);
+ static void add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status);
```
##########
be/src/olap/merger.cpp:
##########
@@ -363,4 +383,21 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr
tablet, ReaderType reader_
return Status::OK();
}
+void Merger::_generate_key_group_cluster_key_idxes(
Review Comment:
warning: method '_generate_key_group_cluster_key_idxes' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void Merger::_generate_key_group_cluster_key_idxes(
```
##########
be/src/olap/rowset/segment_v2/segment.cpp:
##########
@@ -401,16 +401,20 @@ Status Segment::new_inverted_index_iterator(const
TabletColumn& tablet_column,
return Status::OK();
}
-Status Segment::lookup_row_key(const Slice& key, bool with_seq_col,
RowLocation* row_location) {
+Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool
with_rowid,
Review Comment:
warning: function 'lookup_row_key' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool
with_rowid,
^
```
<details>
<summary>Additional context</summary>
**be/src/olap/rowset/segment_v2/segment.cpp:403:** 85 lines including
whitespace and comments (threshold 80)
```cpp
Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, bool
with_rowid,
^
```
</details>
##########
be/src/olap/wal_manager.h:
##########
@@ -32,6 +35,13 @@
class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);
+public:
+ enum WAL_STATUS {
+ PREPARE = 0,
+ REPLAY,
+ CREATE,
+ };
+
public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_manager.h:37:** previously declared here
```cpp
public:
^
```
</details>
##########
be/src/olap/rowset/segment_v2/segment_writer.h:
##########
@@ -149,15 +149,29 @@ class SegmentWriter {
size_t pos);
// used for unique-key with merge on write and segment min_max key
std::string _full_encode_keys(
- const std::vector<vectorized::IOlapColumnDataAccessor*>&
key_columns, size_t pos);
+ const std::vector<vectorized::IOlapColumnDataAccessor*>&
key_columns, size_t pos,
+ bool null_first = true);
+
+ std::string _full_encode_keys(
+ const std::vector<const KeyCoder*>& key_coders,
+ const std::vector<vectorized::IOlapColumnDataAccessor*>&
key_columns, size_t pos,
+ bool null_first = true);
+
// used for unique-key with merge on write
void _encode_seq_column(const vectorized::IOlapColumnDataAccessor*
seq_column, size_t pos,
string* encoded_keys);
+ void _encode_rowid(const uint32_t rowid, string* encoded_keys);
Review Comment:
warning: parameter 'rowid' is const-qualified in the function declaration;
const-qualification of parameters only has an effect in function definitions
[readability-avoid-const-params-in-decls]
```suggestion
void _encode_rowid(uint32_t rowid, string* encoded_keys);
```
##########
be/src/olap/wal_manager.h:
##########
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+#include <gen_cpp/PaloInternalService_types.h>
Review Comment:
warning: 'gen_cpp/PaloInternalService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/PaloInternalService_types.h>
^
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -76,6 +80,78 @@
&_replay_thread);
}
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ LOG(INFO) << "add wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id <<
",status:" << wal_status;
+ auto it = _wal_status_queues.find(table_id);
+ if (it == _wal_status_queues.end()) {
+ std::unordered_map<int64_t, WAL_STATUS> tmp;
+ tmp.emplace(wal_id, wal_status);
+ _wal_status_queues.emplace(table_id, tmp);
+ } else {
+ it->second.emplace(wal_id, wal_status);
+ }
+}
+
+Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ auto it = _wal_status_queues.find(table_id);
+ LOG(INFO) << "remove wal queue "
+ << ",table_id:" << table_id << ",wal_id:" << wal_id;
+ if (it == _wal_status_queues.end()) {
+ return Status::InternalError("table_id " + std::to_string(table_id) +
+ " not found in wal status queue");
+ } else {
+ it->second.erase(wal_id);
+ if (it->second.empty()) {
+ _wal_status_queues.erase(table_id);
+ }
+ }
+ return Status::OK();
+}
+
+Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest*
request,
+ PGetWalQueueSizeResponse*
response) {
+ std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
+ size_t count = 0;
+ auto table_id = request->table_id();
+ auto txn_id = request->txn_id();
+ if (table_id > 0 && txn_id > 0) {
+ auto it = _wal_status_queues.find(table_id);
+ if (it == _wal_status_queues.end()) {
+ LOG(INFO) << ("table_id " + std::to_string(table_id) +
+ " not found in wal status queue");
+ } else {
+ for (auto wal_it = it->second.begin(); wal_it != it->second.end();
++wal_it) {
+ if (wal_it->first <= txn_id) {
+ count += 1;
+ }
+ }
+ }
+ } else {
+ for (auto it = _wal_status_queues.begin(); it !=
_wal_status_queues.end(); it++) {
+ count += it->second.size();
+ }
+ }
+ response->set_size(count);
+ if (count > 0) {
+ print_wal_status_queue();
+ }
+ return Status::OK();
+}
+
+void WalManager::print_wal_status_queue() {
Review Comment:
warning: method 'print_wal_status_queue' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:65:
```diff
- void print_wal_status_queue();
+ static void print_wal_status_queue();
```
##########
be/src/olap/wal_manager.cpp:
##########
@@ -276,4 +359,37 @@
return Status::OK();
}
+bool WalManager::is_running() {
+ return !_stop.load();
+}
+
+void WalManager::stop_relay_wal() {
+ std::lock_guard<std::shared_mutex> wrlock(_lock);
+ for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
+ it->second->stop();
+ }
+}
+
+void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index) {
+ _wal_column_id_map.emplace(wal_id, column_index);
+}
+
+void WalManager::erase_wal_column_index(int64_t wal_id) {
+ if (_wal_column_id_map.erase(wal_id)) {
+ LOG(INFO) << "erase " << wal_id << " from wal_column_id_map";
+ } else {
+ LOG(WARNING) << "fail to erase wal " << wal_id << " from
wal_column_id_map";
+ }
+}
+
+Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index) {
Review Comment:
warning: method 'get_wal_column_index' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status WalManager::get_wal_column_index(int64_t wal_id,
std::vector<size_t>& column_index) {
```
##########
be/src/olap/wal_reader.h:
##########
@@ -32,11 +32,13 @@ class WalReader {
Status finalize();
Status read_block(PBlock& block);
+ Status read_header(uint32_t& version, std::string& col_ids);
private:
Status _deserialize(PBlock& block, std::string& buf);
Status _check_checksum(const char* binary, size_t size, uint32_t checksum);
+private:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_reader.h:36:** previously declared here
```cpp
private:
^
```
</details>
##########
be/src/olap/wal_manager.cpp:
##########
@@ -276,4 +359,37 @@
return Status::OK();
}
+bool WalManager::is_running() {
+ return !_stop.load();
+}
+
+void WalManager::stop_relay_wal() {
+ std::lock_guard<std::shared_mutex> wrlock(_lock);
+ for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
+ it->second->stop();
+ }
+}
+
+void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index) {
+ _wal_column_id_map.emplace(wal_id, column_index);
+}
+
+void WalManager::erase_wal_column_index(int64_t wal_id) {
Review Comment:
warning: method 'erase_wal_column_index' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_manager.h:70:
```diff
- void erase_wal_column_index(int64_t wal_id);
+ static void erase_wal_column_index(int64_t wal_id);
```
##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -301,15 +304,37 @@
return Status::OK();
}
+Status AggLocalState::_merge_spilt_data() {
Review Comment:
warning: method '_merge_spilt_data' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_source_operator.h:111:
```diff
- Status _merge_spilt_data();
+ static Status _merge_spilt_data();
```
##########
be/src/olap/wal_table.h:
##########
@@ -31,17 +31,25 @@ class WalTable {
public:
WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id);
~WalTable();
- // <retry_num, start_time_ms, is_doing_replay>
- using replay_wal_info = std::tuple<int64_t, int64_t, bool>;
// used when be start and there are wals need to do recovery
void add_wals(std::vector<std::string> wals);
Status replay_wals();
size_t size();
+ void stop();
+
+public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
<details>
<summary>Additional context</summary>
**be/src/olap/wal_table.h:30:** previously declared here
```cpp
public:
^
```
</details>
##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -376,9 +401,9 @@
key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.hash_table->template
get_null_key_data<
vectorized::AggregateDataPtr>();
- for (size_t i = 0; i <
_shared_state->aggregate_evaluators.size(); ++i)
-
_shared_state->aggregate_evaluators[i]->insert_result_info(
- mapped +
_dependency->offsets_of_aggregate_states()[i],
+ for (size_t i = 0; i <
shared_state.aggregate_evaluators.size(); ++i)
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
for (size_t i = 0; i <
shared_state.aggregate_evaluators.size(); ++i) {
```
be/src/pipeline/exec/aggregation_source_operator.cpp:406:
```diff
- value_columns[i].get());
+ value_columns[i].get());
+ }
```
##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -66,6 +66,126 @@
return Status::OK();
}
+bool AnalyticSinkLocalState::_whether_need_next_partition(
+ vectorized::BlockRowPos& found_partition_end) {
+ auto& shared_state = *_shared_state;
+ if (shared_state.input_eos ||
+ (shared_state.current_row_position <
+ shared_state.partition_by_end.pos)) { //now still have partition data
+ return false;
+ }
+ if ((shared_state.partition_by_eq_expr_ctxs.empty() &&
!shared_state.input_eos) ||
+ (found_partition_end.pos == 0)) { //no partition, get until fetch to
EOS
+ return true;
+ }
+ if (!shared_state.partition_by_eq_expr_ctxs.empty() &&
+ found_partition_end.pos == shared_state.all_block_end.pos &&
+ !shared_state.input_eos) { //current partition data calculate done
+ return true;
+ }
+ return false;
+}
+
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(
Review Comment:
warning: method '_compare_row_to_find_end' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_sink_operator.h:81:
```diff
- vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
+ static vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
```
##########
be/src/pipeline/exec/analytic_sink_operator.h:
##########
@@ -45,20 +45,45 @@ class AnalyticSinkOperator final : public
StreamingOperator<AnalyticSinkOperator
bool can_write() override { return _node->can_write(); }
};
+class AnalyticSinkDependency final : public Dependency {
+public:
+ using SharedState = AnalyticSharedState;
+ AnalyticSinkDependency(int id, int node_id, QueryContext* query_ctx)
+ : Dependency(id, node_id, "AnalyticSinkDependency", true,
query_ctx) {}
+ ~AnalyticSinkDependency() override = default;
+};
+
class AnalyticSinkOperatorX;
-class AnalyticSinkLocalState : public
PipelineXSinkLocalState<AnalyticDependency> {
+class AnalyticSinkLocalState : public
PipelineXSinkLocalState<AnalyticSinkDependency> {
ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);
public:
AnalyticSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : PipelineXSinkLocalState<AnalyticDependency>(parent, state) {}
+ : PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
private:
friend class AnalyticSinkOperatorX;
+ bool _refresh_need_more_input() {
Review Comment:
warning: method '_refresh_need_more_input' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static bool _refresh_need_more_input() {
```
##########
be/src/olap/wal_writer.cpp:
##########
@@ -53,40 +59,76 @@ Status WalWriter::finalize() {
Status WalWriter::append_blocks(const PBlockArray& blocks) {
{
- std::unique_lock l(_mutex);
- while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
config::wal_max_disk_size) {
- cv.wait_for(l,
std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
+ if (_is_first_append_blocks) {
+ _is_first_append_blocks = false;
+ std::unique_lock l(_mutex);
+ while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
+ config::wal_max_disk_size) {
+ LOG(INFO) << "First time to append blocks to wal file " <<
_file_name
+ << ". Currently, all wal disk space usage is "
+ <<
_all_wal_disk_bytes->load(std::memory_order_relaxed)
+ << ", larger than the maximum limit " <<
config::wal_max_disk_size
+ << ", so we need to wait. When any other load
finished, that wal will be "
+ "removed, the space used by that wal will be
free.";
+ cv->wait_for(l,
std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
+ }
}
}
size_t total_size = 0;
for (const auto& block : blocks) {
total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE;
}
- std::string binary(total_size, '\0');
- char* row_binary = binary.data();
size_t offset = 0;
for (const auto& block : blocks) {
- unsigned long row_length = block->GetCachedSize();
- memcpy(row_binary + offset, &row_length, LENGTH_SIZE);
+ uint8_t len_buf[sizeof(uint64_t)];
+ uint64_t block_length = block->ByteSizeLong();
+ encode_fixed64_le(len_buf, block_length);
+ RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
offset += LENGTH_SIZE;
- memcpy(row_binary + offset, block->SerializeAsString().data(),
row_length);
- offset += row_length;
- uint32_t checksum = crc32c::Value(block->SerializeAsString().data(),
row_length);
- memcpy(row_binary + offset, &checksum, CHECKSUM_SIZE);
+ std::string content = block->SerializeAsString();
+ RETURN_IF_ERROR(_file_writer->append(content));
+ offset += block_length;
+ uint8_t checksum_buf[sizeof(uint32_t)];
+ uint32_t checksum = crc32c::Value(content.data(), block_length);
+ encode_fixed32_le(checksum_buf, checksum);
+ RETURN_IF_ERROR(_file_writer->append({checksum_buf,
sizeof(uint32_t)}));
offset += CHECKSUM_SIZE;
}
- DCHECK(offset == total_size);
- _disk_bytes.store(_disk_bytes.fetch_add(total_size,
std::memory_order_relaxed),
- std::memory_order_relaxed);
- _all_wal_disk_bytes->store(
- _all_wal_disk_bytes->fetch_add(total_size,
std::memory_order_relaxed),
- std::memory_order_relaxed);
- // write rows
- RETURN_IF_ERROR(_file_writer->append({row_binary, offset}));
- _count++;
- if (_count % _batch == 0) {
- //todo sync data
- //LOG(INFO) << "count=" << count << ",do sync";
+ if (offset != total_size) {
+ return Status::InternalError(
+ "failed to write block to wal expected= " +
std::to_string(total_size) +
+ ",actually=" + std::to_string(offset));
+ }
+ _disk_bytes.fetch_add(total_size, std::memory_order_relaxed);
+ _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed);
+ return Status::OK();
+}
+
+Status WalWriter::append_header(uint32_t version, std::string col_ids) {
Review Comment:
warning: method 'append_header' can be made static
[readability-convert-member-functions-to-static]
be/src/olap/wal_writer.h:46:
```diff
- Status append_header(uint32_t version, std::string col_ids);
+ static Status append_header(uint32_t version, std::string col_ids);
```
##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -66,6 +66,126 @@ Status AnalyticSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
return Status::OK();
}
+bool AnalyticSinkLocalState::_whether_need_next_partition(
Review Comment:
warning: method '_whether_need_next_partition' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_sink_operator.h:84:
```diff
- bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
+ static bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
```
##########
be/src/pipeline/exec/aggregation_source_operator.h:
##########
@@ -90,6 +107,19 @@ class AggLocalState final : public
PipelineXLocalState<AggDependency> {
Status _serialize_with_serialized_key_result_with_spilt_data(RuntimeState*
state,
vectorized::Block* block,
SourceState&
source_state);
+ Status _destroy_agg_status(vectorized::AggregateDataPtr data);
+ Status _reset_hash_table();
+ Status _merge_spilt_data();
+ void _make_nullable_output_key(vectorized::Block* block) {
Review Comment:
warning: method '_make_nullable_output_key' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void _make_nullable_output_key(vectorized::Block* block) {
```
##########
be/src/pipeline/exec/aggregation_source_operator.cpp:
##########
@@ -72,44 +73,19 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
&AggLocalState::_serialize_with_serialized_key_result,
this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}
- _executor.close =
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
}
- _agg_data_created_without_key = p._without_key;
+ _shared_state->agg_data_created_without_key = p._without_key;
return Status::OK();
}
-void AggLocalState::_close_with_serialized_key() {
- std::visit(
- [&](auto&& agg_method) -> void {
- auto& data = *agg_method.hash_table;
- data.for_each_mapped([&](auto& mapped) {
- if (mapped) {
-
static_cast<void>(_dependency->destroy_agg_status(mapped));
- mapped = nullptr;
- }
- });
- if (data.has_null_key_data()) {
- auto st = _dependency->destroy_agg_status(
- data.template
get_null_key_data<vectorized::AggregateDataPtr>());
- if (!st) {
- throw Exception(st.code(), st.to_string());
- }
- }
- },
- _agg_data->method_variant);
- _dependency->release_tracker();
-}
-
-void AggLocalState::_close_without_key() {
- //because prepare maybe failed, and couldn't create agg data.
- //but finally call close to destory agg data, if agg data has bitmapValue
- //will be core dump, it's not initialized
- if (_agg_data_created_without_key) {
-
static_cast<void>(_dependency->destroy_agg_status(_agg_data->without_key));
- _agg_data_created_without_key = false;
+Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
Review Comment:
warning: method '_destroy_agg_status' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/aggregation_source_operator.h:109:
```diff
- Status _destroy_agg_status(vectorized::AggregateDataPtr data);
+ static Status _destroy_agg_status(vectorized::AggregateDataPtr data);
```
##########
be/src/pipeline/exec/analytic_source_operator.cpp:
##########
@@ -36,8 +37,129 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state,
OperatorXBase* paren
_agg_functions_size(0),
_agg_functions_created(false) {}
+//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
+vectorized::BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx,
Review Comment:
warning: method '_compare_row_to_find_end' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/analytic_source_operator.h:93:
```diff
- vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
+ static vectorized::BlockRowPos _compare_row_to_find_end(int idx,
vectorized::BlockRowPos start,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]