TsukiokaKogane commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3381645717
##########
be/src/storage/iterator/block_reader.cpp:
##########
@@ -74,6 +77,302 @@ Status BlockReader::next_block_with_aggregation(Block*
block, bool* eof) {
return res;
}
+Status BlockReader::_ensure_binlog_column_pos(const Block& src_block) {
+ if (_binlog_column_pos_inited) {
+ if (_binlog_op_pos >= 0 && _binlog_op_pos < src_block.columns() &&
+ src_block.get_by_position(_binlog_op_pos).name ==
kRowBinlogOpColName) {
+ return Status::OK();
+ }
+ _binlog_op_pos = -1;
+ _binlog_lsn_pos = -1;
+ _binlog_timestamp_pos = -1;
+ _binlog_column_pos_inited = false;
+ }
+
+ const uint32_t col_num = src_block.columns();
+ _before_column_idx.resize(col_num);
+ for (uint32_t i = 0; i < col_num; ++i) {
+ const auto& name = src_block.get_by_position(i).name;
+ if (name == kRowBinlogOpColName) {
+ _binlog_op_pos = static_cast<int>(i);
+ } else if (name == kRowBinlogLsnColName) {
+ _binlog_lsn_pos = static_cast<int>(i);
+ } else if (name == kRowBinlogTimestampColName) {
+ _binlog_timestamp_pos = static_cast<int>(i);
+ } else {
+ std::string before_name = build_before_column_name(name);
+ int tmp_idx = src_block.get_position_by_name(before_name);
+ _before_column_idx[i] = tmp_idx < 0 ? i : tmp_idx;
+ }
+ }
+ _binlog_column_pos_inited = true;
+ return Status::OK();
+}
+
+int64_t BlockReader::_read_binlog_op(const IColumn& col, size_t row) const {
+ const IColumn* cur = &col;
+ if (const auto* nullable = check_and_get_column<ColumnNullable>(*cur)) {
+ if (nullable->is_null_at(row)) {
+ return ROW_BINLOG_UNKNOWN;
+ }
+ cur = &nullable->get_nested_column();
+ }
+
+ if (const auto* int64_col = check_and_get_column<ColumnInt64>(*cur)) {
+ return int64_col->get_element(row);
+ }
+
+ return ROW_BINLOG_UNKNOWN;
+}
+
+Status BlockReader::_write_binlog_op(IColumn& col, int64_t op) const {
+ IColumn* cur = &col;
+ ColumnNullable* nullable = nullptr;
+ if (auto* n = typeid_cast<ColumnNullable*>(cur)) {
+ nullable = n;
+ cur = &nullable->get_nested_column();
+ }
+
+ if (auto* int64_col = typeid_cast<ColumnInt64*>(cur)) {
+ int64_col->insert_value(op);
+ } else {
+ return Status::InternalError("invalid column type");
+ }
+
+ if (nullable != nullptr) {
+ nullable->get_null_map_data().push_back(0);
+ }
+ return Status::OK();
+}
+
+bool BlockReader::_is_binlog_meta_column(int idx) const {
+ return idx == _binlog_op_pos || idx == _binlog_lsn_pos || idx ==
_binlog_timestamp_pos;
+}
+
+int BlockReader::_resolve_source_column_index(int idx, bool use_before) const {
+ if (!use_before || _is_binlog_meta_column(idx)) {
+ return idx;
+ }
+
+ return _before_column_idx[idx];
+}
+
+void BlockReader::_init_pending_row_columns(const Block& block) {
+ if (!_pending_row_columns.empty()) {
+ return;
+ }
+ _pending_row_columns = block.clone_empty_columns();
+}
+
+bool BlockReader::_emit_pending_row(MutableColumns& target_columns, size_t&
output_row_count) {
+ if (!_has_pending_row) {
+ return false;
+ }
+ for (size_t i = 0; i < _pending_row_columns.size(); ++i) {
+ target_columns[i]->insert_from(*_pending_row_columns[i], 0);
+ _pending_row_columns[i]->clear();
+ }
+ _has_pending_row = false;
+ output_row_count++;
+ return true;
+}
+
+Status BlockReader::_append_change_row(MutableColumns& target_columns, const
Block& src_block,
+ size_t row_pos, int64_t output_op, bool
use_before) {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (target_col_idx < 0) {
+ continue;
+ }
+ if (idx == _binlog_op_pos) {
+ RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
output_op));
+ continue;
+ }
+ int source_idx = _resolve_source_column_index(idx, use_before);
+
target_columns[target_col_idx]->insert_from(*src_block.get_by_position(source_idx).column,
+ row_pos);
+ }
+ return Status::OK();
+}
+
+Status BlockReader::_min_delta_next_block(Block* block, bool* eof) {
+ if (UNLIKELY(_eof && !_has_pending_row)) {
+ *eof = true;
+ return Status::OK();
+ }
+
+ if (_stored_data_columns.empty()) {
+ _stored_data_columns = _next_row.block->clone_empty_columns();
+ }
+
+ auto target_columns_guard = block->mutate_columns_scoped();
+ auto& target_columns = target_columns_guard.mutable_columns();
+ size_t output_row_count = 0;
+ _init_pending_row_columns(*block);
+ RETURN_IF_ERROR(_ensure_binlog_column_pos(*_next_row.block));
+ while (output_row_count < batch_max_rows()) {
+ if (_emit_pending_row(target_columns, output_row_count)) {
+ continue;
+ }
+ if (_eof) {
+ break;
+ }
+ bool need_pop = _stored_data_columns[0]->size() > 1;
+ for (size_t i = 0; i < _stored_data_columns.size(); ++i) {
+ if (need_pop) {
+ _stored_data_columns[i]->pop_back(1);
+ }
+
_stored_data_columns[i]->insert_from(*_next_row.block->get_by_position(i).column,
+ _next_row.row_pos);
+ }
+ auto res = _vcollect_iter.next(&_next_row);
+ if (UNLIKELY(res.is<END_OF_FILE>())) {
+ _eof = true;
+ } else if (UNLIKELY(!res.ok())) {
+ return res;
+ }
+
+ if (!_eof && _next_row.is_same) {
+ continue;
+ }
+ size_t group_size = _stored_data_columns[0]->size();
+ auto first_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos],
0);
+ auto last_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos],
group_size - 1);
+ auto result = AggregateFunctionMinDelta::calculate_result(first_op,
last_op);
+ switch (result) {
+ case AggregateFunctionMinDelta::ResultType::SKIP:
+ break;
+ case AggregateFunctionMinDelta::ResultType::INSERT:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+ STREAM_CHANGE_INSERT));
+ } else {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size -
1);
+ }
+ }
+ output_row_count++;
+ break;
+ case AggregateFunctionMinDelta::ResultType::DELETE:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+ STREAM_CHANGE_DELETE));
+ } else {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size -
1);
+ }
+ }
+ output_row_count++;
+ break;
+ case AggregateFunctionMinDelta::ResultType::UPDATE_BEFORE_AFTER:
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_BEFORE));
+ } else if (idx == _binlog_lsn_pos) {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size -
1);
+ } else {
+ int source_idx = _resolve_source_column_index(idx, true);
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[source_idx],
+ 0);
+ }
+ }
+ output_row_count++;
+ if (output_row_count >= batch_max_rows()) {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*_pending_row_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_AFTER));
+ } else {
+ _pending_row_columns[target_col_idx]->insert_from(
+ *_stored_data_columns[idx], group_size - 1);
+ }
+ }
+ _has_pending_row = true;
+ } else {
+ for (auto idx : _normal_columns_idx) {
+ int target_col_idx = _return_columns_loc[idx];
+ if (idx == _binlog_op_pos) {
+
RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
+
STREAM_CHANGE_UPDATE_AFTER));
+ } else {
+
target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
+ group_size
- 1);
+ }
+ }
+ output_row_count++;
+ }
+ break;
+ }
+
+ for (auto& col : _stored_data_columns) {
+ col->clear();
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]