jacktengg commented on code in PR #61212:
URL: https://github.com/apache/doris/pull/61212#discussion_r2937759800
##########
be/src/exec/operator/partitioned_aggregation_sink_operator.cpp:
##########
@@ -325,149 +352,93 @@ Status
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
HashTableType&
hash_table,
const size_t
size_to_revoke, bool eos) {
Status status;
- Defer defer {[&]() {
- if (!status.ok()) {
- Base::_shared_state->close();
- }
- }};
context.init_iterator();
+ auto& parent = _parent->template cast<PartitionedAggSinkOperatorX>();
-
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
+
Base::_shared_state->_in_mem_shared_state->aggregate_data_container->init_once();
- const auto total_rows =
-
Base::_shared_state->in_mem_shared_state->aggregate_data_container->total_count();
+ const auto total_rows =
parent._agg_sink_operator->get_hash_table_size(_runtime_state.get());
+
+ if (total_rows == 0) {
+ return Status::OK();
+ }
const size_t size_to_revoke_ = std::max<size_t>(size_to_revoke, 1);
// `spill_batch_rows` will be between 4k and 1M
// and each block to spill will not be larger than
32MB(`MAX_SPILL_WRITE_BATCH_MEM`)
+ // TODO: yiguolei, should review this logic
const auto spill_batch_rows = std::min<size_t>(
- 1024 * 1024, std::max<size_t>(4096,
SpillStream::MAX_SPILL_WRITE_BATCH_MEM *
- total_rows /
size_to_revoke_));
+
+ 1024 * 1024, std::max<size_t>(4096,
SpillFile::MAX_SPILL_WRITE_BATCH_MEM * total_rows /
+ size_to_revoke_));
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
- << ", spill_batch_rows: " << spill_batch_rows << ", total rows:
" << total_rows;
+ << ", spill_batch_rows: " << spill_batch_rows << ", total rows:
" << total_rows
+ << ", size_to_revoke: " << size_to_revoke;
size_t row_count = 0;
std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos(
- Base::_shared_state->partition_count);
- auto& iter =
Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator;
- while (iter !=
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() &&
+ parent._partition_count);
+ auto& iter =
Base::_shared_state->_in_mem_shared_state->aggregate_data_container->iterator;
+ while (iter !=
Base::_shared_state->_in_mem_shared_state->aggregate_data_container->end() &&
!state->is_cancelled()) {
const auto& key = iter.template get_key<typename
HashTableType::key_type>();
- auto partition_index =
Base::_shared_state->get_partition_index(hash_table.hash(key));
+ auto partition_index = hash_table.hash(key) % parent._partition_count;
Review Comment:
What if parent._partition_count == 0?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]