This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 49e123c9083 [Bug](join) fix broadcast join running when hash table
build not finished (#37643)
49e123c9083 is described below
commit 49e123c90838cde11a62a9fd8be894ad842d3549
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Jul 15 11:53:06 2024 +0800
[Bug](join) fix broadcast join running when hash table build not finished
(#37643)
## Proposed changes
before when PipelineTask close, will be set sink operator always ready.
but not all sink could be running normal, like some instance of join
which not build hash table,
it's need to wait until others build hash table finished and then shared
from it.
```
F20240710 17:29:09.628299 221449 hashjoin_build_sink.cpp:582] Check failed:
_shared_hash_table_context->signaled
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at
/mnt/disk2/zhangsida/doris/be/src/common/signal_handler.h:421
1# 0x00007FEF9BF64B50 in /lib64/libc.so.6
2# gsignal in /lib64/libc.so.6
3# __GI_abort in /lib64/libc.so.6
4# 0x0000559C8BD8BE8D in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
5# 0x0000559C8BD7E52A in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
6# google::LogMessage::SendToLog() in
/mnt/disk2/zhangsida/doris/output/be/lib/doris_be
7# google::LogMessage::Flush() in
/mnt/disk2/zhangsida/doris/output/be/lib/doris_be
8# google::LogMessageFatal::~LogMessageFatal() in
/mnt/disk2/zhangsida/doris/output/be/lib/doris_be
9# doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*,
doris::vectorized::Block*, bool) at
/mnt/disk2/zhangsida/doris/be/src/pipeline/exec/hashjoin_build_sink.cpp:582
10# doris::pipeline::PipelineTask::execute(bool*)::$_1::operator()() const
at /mnt/disk2/zhangsida/doris/be/src/pipeline/pipeline_task.cpp:361
11# doris::pipeline::PipelineTask::execute(bool*) at
/mnt/disk2/zhangsida/doris/be/src/pipeline/pipeline_task.cpp:364
12# doris::pipeline::TaskScheduler::_do_work(unsigned long) at
/mnt/disk2/zhangsida/doris/be/src/pipeline/task_scheduler.cpp:138
13# doris::pipeline::TaskScheduler::start()::$_0::operator()() const at
/mnt/disk2/zhangsida/doris/be/src/pipeline/task_scheduler.cpp:64
```
---
be/src/pipeline/exec/analytic_source_operator.cpp | 5 +
be/src/pipeline/exec/operator.cpp | 5 -
regression-test/data/query_p0/join/test_join6.out | 5 +
.../suites/query_p0/join/test_join6.groovy | 291 +++++++++++++++++++++
4 files changed, 301 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index a036481d727..93e87cbce5d 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -559,6 +559,11 @@ Status AnalyticLocalState::close(RuntimeState* state) {
std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
+ // Some kinds of source operators has a 1-1 relationship with a sink
operator (such as AnalyticOperator).
+ // We must ensure AnalyticSinkOperator will not be blocked if
AnalyticSourceOperator already closed.
+ if (_shared_state && _shared_state->sink_deps.size() == 1) {
+ _shared_state->sink_deps.front()->set_always_ready();
+ }
return PipelineXLocalState<AnalyticSharedState>::close(state);
}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index ba3602a91cb..eba380f4386 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -511,11 +511,6 @@ Status
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
_closed = true;
- // Some kinds of source operators has a 1-1 relationship with a sink
operator (such as AnalyticOperator).
- // We must ensure AnalyticSinkOperator will not be blocked if
AnalyticSourceOperator already closed.
- if (_shared_state && _shared_state->sink_deps.size() == 1) {
- _shared_state->sink_deps.front()->set_always_ready();
- }
return Status::OK();
}
diff --git a/regression-test/data/query_p0/join/test_join6.out
b/regression-test/data/query_p0/join/test_join6.out
new file mode 100644
index 00000000000..eca6185e834
--- /dev/null
+++ b/regression-test/data/query_p0/join/test_join6.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_hash_join --
+
+-- !select_hash_join2 --
+
diff --git a/regression-test/suites/query_p0/join/test_join6.groovy
b/regression-test/suites/query_p0/join/test_join6.groovy
new file mode 100644
index 00000000000..612ea373793
--- /dev/null
+++ b/regression-test/suites/query_p0/join/test_join6.groovy
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_join6", "query,p0") {
+ def DBname = "regression_test_join6"
+ sql "DROP DATABASE IF EXISTS ${DBname}"
+ sql "CREATE DATABASE IF NOT EXISTS ${DBname}"
+ sql "use ${DBname}"
+
+ def tbName1 =
"table_20_undef_partitions2_keys3_properties4_distributed_by5"
+ def tbName2 =
"table_20_undef_partitions2_keys3_properties4_distributed_by52"
+ def tbName3 =
"table_30_undef_partitions2_keys3_properties4_distributed_by5"
+ def tbName4 =
"table_30_undef_partitions2_keys3_properties4_distributed_by52"
+ def tbName5 =
"table_50_undef_partitions2_keys3_properties4_distributed_by5"
+ def tbName6 =
"table_50_undef_partitions2_keys3_properties4_distributed_by52"
+ def tbName7 =
"table_100_undef_partitions2_keys3_properties4_distributed_by5"
+ def tbName8 =
"table_100_undef_partitions2_keys3_properties4_distributed_by52"
+ def tbName9 =
"table_200_undef_partitions2_keys3_properties4_distributed_by5"
+ def tbName10 =
"table_200_undef_partitions2_keys3_properties4_distributed_by52"
+
+ sql "DROP TABLE IF EXISTS ${tbName1};"
+ sql "DROP TABLE IF EXISTS ${tbName2};"
+ sql "DROP TABLE IF EXISTS ${tbName3};"
+ sql "DROP TABLE IF EXISTS ${tbName4};"
+ sql "DROP TABLE IF EXISTS ${tbName5};"
+ sql "DROP TABLE IF EXISTS ${tbName6};"
+ sql "DROP TABLE IF EXISTS ${tbName7};"
+ sql "DROP TABLE IF EXISTS ${tbName8};"
+ sql "DROP TABLE IF EXISTS ${tbName9};"
+ sql "DROP TABLE IF EXISTS ${tbName10};"
+
+ sql """
+ create table
table_20_undef_partitions2_keys3_properties4_distributed_by5 (
+ col_int_undef_signed2 int ,
+ col_int_undef_signed int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed2)
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_20_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,6,4,-3343259,7),(1,null,2,-5659896,0),(2,2,2369913,-5247778,-4711382),(3,6545002,3,2,4),(4,9,3,4,5),(5,4,5,4,1),(6,4,-4704791,null,6),(7,null,3,null,9),(8,-1012411,4,null,-1244656),(9,1,8,9,-5175872),(10,8,0,-4239951,2),(11,8,-2231762,4817469,2),(12,9,9,5,-427963),(13,4,0,null,-5587539),(14,-5949786,2,2,34322
[...]
+ """
+
+ sql """
+ create table
table_20_undef_partitions2_keys3_properties4_distributed_by52 (
+ col_int_undef_signed int ,
+ col_int_undef_signed2 int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+ PARTITION BY RANGE(col_int_undef_signed) (
+ PARTITION p0 VALUES LESS THAN ('4'),
+ PARTITION p1 VALUES LESS THAN ('6'),
+ PARTITION p2 VALUES LESS THAN ('7'),
+ PARTITION p3 VALUES LESS THAN ('8'),
+ PARTITION p4 VALUES LESS THAN ('10'),
+ PARTITION p5 VALUES LESS THAN ('83647'),
+ PARTITION p100 VALUES LESS THAN ('2147483647')
+ )
+
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_20_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,6,-179064,5213411,5),(1,3,5,2,6),(2,4226261,7,null,3),(3,9,null,4,4),(4,-1003770,2,1,1),(5,8,7,null,8176864),(6,3388266,5,8,8),(7,5,1,2,null),(8,9,2064412,0,null),(9,1489553,8,-446412,6),(10,1,3,0,1),(11,null,3,4621304,null),(12,null,-3058026,-262645,9),(13,null,null,9,3),(14,null,null,5037128,7),(15,299896,
[...]
+ """
+
+ sql """
+ create table
table_30_undef_partitions2_keys3_properties4_distributed_by5 (
+ col_int_undef_signed2 int ,
+ col_int_undef_signed int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed2)
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_30_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,2,null,0,null),(1,-242819,2983243,7071252,3),(2,1,-2342407,-1423905,8),(3,null,null,7,4),(4,-1494065,3,7,2),(5,5,0,-595225,5),(6,5,-3324113,0,5),(7,6829192,3527453,6,5436506),(8,1,-3189592,2,9),(9,null,2,6,2),(10,-4070807,null,-3324205,7),(11,8,-5293967,1,-5040205),(12,6,7440524,null,null),(13,null,2,9,5),(14
[...]
+ """
+
+ sql """
+ create table
table_30_undef_partitions2_keys3_properties4_distributed_by52 (
+ col_int_undef_signed int ,
+ col_int_undef_signed2 int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+ PARTITION BY RANGE(col_int_undef_signed) (
+ PARTITION p0 VALUES LESS THAN ('4'),
+ PARTITION p1 VALUES LESS THAN ('6'),
+ PARTITION p2 VALUES LESS THAN ('7'),
+ PARTITION p3 VALUES LESS THAN ('8'),
+ PARTITION p4 VALUES LESS THAN ('10'),
+ PARTITION p5 VALUES LESS THAN ('83647'),
+ PARTITION p100 VALUES LESS THAN ('2147483647')
+ )
+
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_30_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,9,9,null,1),(1,6821639,9,null,-5431086),(2,8,4,6,7701043),(3,2,-6700938,1425835,7),(4,null,1,3,4),(5,8,8,-714745,null),(6,7,3,4447765,null),(7,1,-2101501,0,5),(8,7,0,9,6),(9,4696294,3,2,-3197661),(10,8,4600901,8,1),(11,-1042936,null,-2187191,0),(12,5116430,0,2687672,9),(13,3,3,8,1287742),(14,-3829647,3,4,751
[...]
+ """
+
+ sql """
+ create table
table_50_undef_partitions2_keys3_properties4_distributed_by5 (
+ col_int_undef_signed2 int ,
+ col_int_undef_signed int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed2)
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_50_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,8,0,3,7),(1,6,227612,4,8),(2,-590975,9,-4411568,6),(3,-7241036,null,3,5),(4,1,7,null,8),(5,2509741,5,5,1),(6,2,9,null,4817793),(7,6,8,3,0),(8,null,1,4,null),(9,711269,null,-613109,null),(10,null,7,0,7),(11,null,-5534845,0,4),(12,5,2,9,6850777),(13,-5789051,8,6,2463068),(14,2,5,953451,1),(15,-6229147,-6738861,
[...]
+ """
+
+ sql """
+ create table
table_50_undef_partitions2_keys3_properties4_distributed_by52 (
+ col_int_undef_signed int ,
+ col_int_undef_signed2 int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+ PARTITION BY RANGE(col_int_undef_signed) (
+ PARTITION p0 VALUES LESS THAN ('4'),
+ PARTITION p1 VALUES LESS THAN ('6'),
+ PARTITION p2 VALUES LESS THAN ('7'),
+ PARTITION p3 VALUES LESS THAN ('8'),
+ PARTITION p4 VALUES LESS THAN ('10'),
+ PARTITION p5 VALUES LESS THAN ('83647'),
+ PARTITION p100 VALUES LESS THAN ('2147483647')
+ )
+
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_50_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,-7314662,null,0,4373927),(1,0,9,2,null),(2,5,2151343,-1467194,null),(3,null,null,-6124108,null),(4,5795207,4306466,4,7),(5,6,8,3,9),(6,null,8,-7232808,9),(7,9,6,9,6),(8,4637962,-1241311,2,8),(9,1,2,3,null),(10,0,-1652390,1,3),(11,0,9,6,2),(12,-8342795,0,5539034,-4960208),(13,2768087,7,-6242297,4996873),(14,1
[...]
+ """
+
+ sql """
+ create table
table_100_undef_partitions2_keys3_properties4_distributed_by5 (
+ col_int_undef_signed2 int ,
+ col_int_undef_signed int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed2)
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_100_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,3,7164641,5,8),(1,null,3916062,5,6),(2,1,5533498,0,9),(3,7,2,null,7057679),(4,1,0,7,7),(5,null,4,2448564,1),(6,7531976,7324373,9,7),(7,3,1,1,3),(8,6,8131576,9,-1793807),(9,9,2,4214547,9),(10,-7299852,5,1,3),(11,7,3,-1036551,5),(12,-6108579,84823,4,1229534),(13,-1065629,5,4,null),(14,null,8072633,3328285,2),(
[...]
+ """
+
+
+ sql """
+ create table
table_100_undef_partitions2_keys3_properties4_distributed_by52 (
+ col_int_undef_signed int ,
+ col_int_undef_signed2 int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+ PARTITION BY RANGE(col_int_undef_signed) (
+ PARTITION p0 VALUES LESS THAN ('4'),
+ PARTITION p1 VALUES LESS THAN ('6'),
+ PARTITION p2 VALUES LESS THAN ('7'),
+ PARTITION p3 VALUES LESS THAN ('8'),
+ PARTITION p4 VALUES LESS THAN ('10'),
+ PARTITION p5 VALUES LESS THAN ('83647'),
+ PARTITION p100 VALUES LESS THAN ('2147483647')
+ )
+
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_100_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,7865838,-348902,null,8),(1,-9434,9,8,0),(2,1845860,6675073,-7931956,-66007),(3,-7523286,210291,3,4),(4,null,-1341350,-5318642,1),(5,-6634226,2179558,2,7),(6,2,7,2,3),(7,9,2,3,-7773846),(8,0,8,6,2407384),(9,0,1,7,7),(10,5,5,null,8),(11,9,null,8283010,6),(12,7359987,5145929,2,5),(13,0,5225949,0,6770846),(14,1
[...]
+ """
+
+ sql """
+ create table
table_200_undef_partitions2_keys3_properties4_distributed_by5 (
+ col_int_undef_signed2 int ,
+ col_int_undef_signed int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed2)
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_200_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,null,7,3,9),(1,6970022,9,6,2),(2,null,0,null,7262031),(3,4,6,null,7236151),(4,789682,7324018,5,5),(5,-2056178,9,0,0),(6,-7081969,-2103366,0,1),(7,3,5,3,3),(8,3175437,4,6,-2017026),(9,3,null,null,7),(10,-5725039,5,2,3),(11,8,9,2,5),(12,-6487649,1,5,-2847073),(13,3415118,null,4,-6786736),(14,null,4,7,1),(15,99
[...]
+ """
+
+ sql """
+ create table
table_200_undef_partitions2_keys3_properties4_distributed_by52 (
+ col_int_undef_signed int ,
+ col_int_undef_signed2 int ,
+ col_int_undef_signed3 int ,
+ col_int_undef_signed4 int ,
+ pk int
+ ) engine=olap
+ DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+ PARTITION BY RANGE(col_int_undef_signed) (
+ PARTITION p0 VALUES LESS THAN ('4'),
+ PARTITION p1 VALUES LESS THAN ('6'),
+ PARTITION p2 VALUES LESS THAN ('7'),
+ PARTITION p3 VALUES LESS THAN ('8'),
+ PARTITION p4 VALUES LESS THAN ('10'),
+ PARTITION p5 VALUES LESS THAN ('83647'),
+ PARTITION p100 VALUES LESS THAN ('2147483647')
+ )
+
+ distributed by hash(pk) buckets 10
+ properties("replication_num" = "1");
+ """
+ sql """
+ insert into
table_200_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
values
(0,null,6178782,4,-1498997),(1,null,null,2,4),(2,8,6,6114625,6840353),(3,6,-3487226,4,-18364),(4,6647558,0,7,4),(5,5,1,3,3991803),(6,null,3,3,6),(7,-1597140,3,3,2),(8,6415967,null,9,null),(9,0,2,-1569216,8263281),(10,2546741,4,-4334118,8),(11,2375117,5,null,-3767162),(12,4,290235,null,6),(13,5569849,8,6,null),
[...]
+ """
+
+ qt_select_hash_join """
+ SELECT /*+ leading( tbl5 { tbl3 tbl4 } tbl1 tbl2 ) */ tbl4 .
col_int_undef_signed4 AS col_int_undef_signed , 2 AS col_int_undef_signed2 ,
tbl3 . col_int_undef_signed2 AS col_int_undef_signed3 , tbl1 .
col_int_undef_signed AS col_int_undef_signed4 FROM
table_50_undef_partitions2_keys3_properties4_distributed_by5 AS tbl1 INNER
JOIN table_20_undef_partitions2_keys3_properties4_distributed_by5 AS tbl2 ON
tbl2 . col_int_undef_signed = tbl1 . col_int_undef_signed2 AND tbl2 . col_ [...]
+ """
+
+ qt_select_hash_join2 """
+ SELECT
+ /*+ ORDERED leading( { tbl3 tbl4 tbl1 tbl2 } ) */
+ 3
+ FROM
+ table_30_undef_partitions2_keys3_properties4_distributed_by52 AS
tbl1
+ WHERE
+ (
+ tbl1.col_int_undef_signed2 IN (
+ SELECT
+ 0
+ FROM
+ (
+ SELECT
+ 1
+ FROM
+
table_20_undef_partitions2_keys3_properties4_distributed_by52 AS tbl1
+ JOIN
table_30_undef_partitions2_keys3_properties4_distributed_by52 AS tbl2 ON
tbl2.col_int_undef_signed4 = tbl1.col_int_undef_signed2
+ ) AS tbl1
+ )
+ AND (
+ tbl1.col_int_undef_signed2 NOT IN (
+ SELECT
+ tbl1.col_int_undef_signed AS col_int_undef_signed
+ FROM
+
table_20_undef_partitions2_keys3_properties4_distributed_by52 AS tbl1
+ )
+ )
+ );
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]