This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 12666c2e53b54862ecfd0c9045082f82c86d4c58
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 31 11:13:05 2022 +0800

    [Bug] Fix DCHECK failed in runtime filter and mutable block (#8720)
    
    Co-authored-by: lihaopeng <[email protected]>
---
 be/src/exprs/runtime_filter.cpp         |  3 ++-
 be/src/vec/sink/vdata_stream_sender.cpp | 13 ++++++++-----
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 48cee87..73eff85 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -458,7 +458,8 @@ public:
         DCHECK(state != nullptr);
         DCHECK(container != nullptr);
         DCHECK(_pool != nullptr);
-        DCHECK(prob_expr->root()->type().type == _column_return_type);
+        DCHECK(prob_expr->root()->type().type == _column_return_type ||
+        (is_string_type(prob_expr->root()->type().type) && 
is_string_type(_column_return_type)));
 
         auto real_filter_type = get_real_type();
         switch (real_filter_type) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index fc55ca8..cff400c 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -425,9 +425,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
         }
         _current_channel_idx = (_current_channel_idx + 1) % _channels.size();
     } else if (_part_type == TPartitionType::HASH_PARTITIONED) {
-        int num_channels = _channels.size();
         // will only copy schema
         // we don't want send temp columns
+        auto column_to_keep = block->columns();
 
         int result_size = _partition_expr_ctxs.size();
         int result[result_size];
@@ -451,12 +451,14 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block) {
             hash_vals[i] = siphashs[i].get64();
         }
 
-        RETURN_IF_ERROR(channel_add_rows(_channels, num_channels, hash_vals, 
rows, block));
+        Block::erase_useless_column(block, column_to_keep);
+        RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(), 
hash_vals, rows, block));
     } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        // will only copy schema
+        // we don't want send temp columns
+        auto column_to_keep = block->columns();
         // 1. calculate hash
         // 2. dispatch rows to channel
-        int num_channels = _channel_shared_ptrs.size();
-
         int result_size = _partition_expr_ctxs.size();
         int result[result_size];
         RETURN_IF_ERROR(get_partition_column_result(block, result));
@@ -484,8 +486,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
             }
         }
 
+        Block::erase_useless_column(block, column_to_keep);
         RETURN_IF_ERROR(
-                channel_add_rows(_channel_shared_ptrs, num_channels, 
hash_vals, rows, block));
+                channel_add_rows(_channel_shared_ptrs, 
_channel_shared_ptrs.size(), hash_vals, rows, block));
     } else {
         // Range partition
         // 1. calculate range

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to