This is an automated email from the ASF dual-hosted git repository.
maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new 6c41d270a2e Add runtime filter pushdown support for DynamicSeqscan
operator
6c41d270a2e is described below
commit 6c41d270a2ec968eadf011401ba1e5d12a5ab71f
Author: zhangyue <[email protected]>
AuthorDate: Wed Jul 2 16:22:29 2025 +0800
Add runtime filter pushdown support for DynamicSeqscan operator
- Extend runtime filter pushdown to DynamicSeqscan;
- Update related executor nodes and headers;
- Update regression tests
---
src/backend/commands/explain.c | 21 +++++-----
src/backend/executor/nodeDynamicSeqscan.c | 9 +++++
src/backend/executor/nodeHash.c | 36 +++++++++++++++--
src/backend/executor/nodeHashjoin.c | 16 ++++----
src/backend/executor/nodeSeqscan.c | 18 ++++-----
src/include/executor/nodeSeqscan.h | 2 +
src/include/nodes/execnodes.h | 3 ++
src/test/regress/expected/gp_runtime_filter.out | 54 +++++++++++++++++++++++++
src/test/regress/sql/gp_runtime_filter.sql | 29 +++++++++++++
9 files changed, 156 insertions(+), 32 deletions(-)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index e87fb30e7c4..0d63d374128 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -140,8 +140,7 @@ static void show_incremental_sort_info(IncrementalSortState
*incrsortstate,
static void show_hash_info(HashState *hashstate, ExplainState *es);
static void show_runtime_filter_info(RuntimeFilterState *rfstate,
ExplainState *es);
-static void show_pushdown_runtime_filter_info(const char *qlabel,
-
PlanState *planstate,
+static void show_pushdown_runtime_filter_info(PlanState *planstate,
ExplainState *es);
static void show_memoize_info(MemoizeState *mstate, List *ancestors,
ExplainState *es);
@@ -2500,8 +2499,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
/* FALLTHROUGH */
case T_SeqScan:
if (gp_enable_runtime_filter_pushdown && IsA(planstate,
SeqScanState))
- show_pushdown_runtime_filter_info("Rows Removed
by Pushdown Runtime Filter",
-
planstate, es);
+ show_pushdown_runtime_filter_info(planstate,
es);
/* FALLTHROUGH */
case T_DynamicSeqScan:
case T_ValuesScan:
@@ -2510,6 +2508,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
case T_WorkTableScan:
case T_SubqueryScan:
if (IsA(plan, DynamicSeqScan)) {
+ if (gp_enable_runtime_filter_pushdown)
+
show_pushdown_runtime_filter_info(planstate, es);
+
char *buf;
Oid relid;
relid = rt_fetch(((DynamicSeqScan *)plan)
@@ -4417,17 +4418,19 @@ show_instrumentation_count(const char *qlabel, int
which,
* runtime filter.
*/
static void
-show_pushdown_runtime_filter_info(const char *qlabel,
- PlanState
*planstate,
- ExplainState
*es)
+show_pushdown_runtime_filter_info(PlanState *planstate, ExplainState *es)
{
- Assert(gp_enable_runtime_filter_pushdown && IsA(planstate,
SeqScanState));
+ Assert(gp_enable_runtime_filter_pushdown &&
+ (IsA(planstate, SeqScanState) || IsA(planstate,
DynamicSeqScanState)));
if (!es->analyze || !planstate->instrument)
return;
if (planstate->instrument->prf_work)
- ExplainPropertyFloat(qlabel, NULL,
planstate->instrument->nfilteredPRF, 0, es);
+ {
+ ExplainPropertyFloat("Rows Removed by Pushdown Runtime Filter",
+ NULL,
planstate->instrument->nfilteredPRF, 0, es);
+ }
}
/*
diff --git a/src/backend/executor/nodeDynamicSeqscan.c
b/src/backend/executor/nodeDynamicSeqscan.c
index c13be0a1f1b..3380b907ba3 100644
--- a/src/backend/executor/nodeDynamicSeqscan.c
+++ b/src/backend/executor/nodeDynamicSeqscan.c
@@ -223,7 +223,16 @@ ExecDynamicSeqScan(PlanState *pstate)
slot = ExecProcNode(&node->seqScanState->ss.ps);
if (!TupIsNull(slot))
+ {
+ if (gp_enable_runtime_filter_pushdown
+ && !pstate->state->useMppParallelMode
+ && node->filters)
+ {
+ if (!PassByBloomFilter(&node->ss.ps,
node->filters, slot))
+ continue;
+ }
break;
+ }
/* No more tuples from this partition. Move to next one. */
CleanupOnePartition(node);
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 53e55e26f91..62d3c2da790 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -4161,7 +4161,9 @@ PushdownRuntimeFilter(HashState *node)
scankeys = NIL;
attr_filter = lfirst(lc);
- if (!IsA(attr_filter->target, SeqScanState) ||
attr_filter->empty)
+ if (attr_filter->empty ||
+ (!IsA(attr_filter->target, SeqScanState) &&
+ !IsA(attr_filter->target, DynamicSeqScanState)))
continue;
/* bloom filter */
@@ -4190,8 +4192,21 @@ PushdownRuntimeFilter(HashState *node)
scankeys = lappend(scankeys, sk);
/* append new runtime filters to target node */
- SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
- sss->filters = list_concat(sss->filters, scankeys);
+ if (IsA(attr_filter->target, SeqScanState))
+ {
+ SeqScanState *sss = castNode(SeqScanState,
attr_filter->target);
+ sss->filters = list_concat(sss->filters, scankeys);
+ }
+ else if (IsA(attr_filter->target, DynamicSeqScanState))
+ {
+ DynamicSeqScanState *dsss =
castNode(DynamicSeqScanState, attr_filter->target);
+ dsss->filters = list_concat(dsss->filters, scankeys);
+ }
+ else
+ {
+ /* never reach here */
+ pg_unreachable();
+ }
}
}
@@ -4250,6 +4265,7 @@ ResetRuntimeFilter(HashState *node)
ListCell *lc;
AttrFilter *attr_filter;
SeqScanState *sss;
+ DynamicSeqScanState *dsss;
if (!node->filters)
return;
@@ -4268,6 +4284,20 @@ ResetRuntimeFilter(HashState *node)
sss->filters = NIL;
}
}
+ else if (IsA(attr_filter->target, DynamicSeqScanState))
+ {
+ dsss = castNode(DynamicSeqScanState,
attr_filter->target);
+ if (dsss->filters)
+ {
+ list_free_deep(dsss->filters);
+ dsss->filters = NIL;
+ }
+ }
+ else
+ {
+ /* never reach here */
+ pg_unreachable();
+ }
if (attr_filter->blm_filter)
bloom_free(attr_filter->blm_filter);
diff --git a/src/backend/executor/nodeHashjoin.c
b/src/backend/executor/nodeHashjoin.c
index 2a60c777184..b14446dc953 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -2239,7 +2239,7 @@ CreateRuntimeFilter(HashJoinState* hjstate)
foreach(lc, targets)
{
PlanState *target = lfirst(lc);
- Assert(IsA(target, SeqScanState));
+ Assert(IsA(target, SeqScanState) || IsA(target,
DynamicSeqScanState));
attr_filter = CreateAttrFilter(target, lattno, rattno,
hstate->ps.plan->plan_rows);
@@ -2334,7 +2334,7 @@ CheckTargetNode(PlanState *node, AttrNumber attno,
AttrNumber *lattno)
Var *var;
TargetEntry *te;
- if (!IsA(node, SeqScanState))
+ if (!IsA(node, SeqScanState) && !IsA(node, DynamicSeqScanState))
return false;
te = (TargetEntry *)list_nth(node->plan->targetlist, attno - 1);
@@ -2375,16 +2375,14 @@ FindTargetNodes(HashJoinState *hjstate, AttrNumber
attno, AttrNumber *lattno)
targetNodes = NIL;
while (true)
{
- /* target is seqscan */
- if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) &&
IsA(child, SeqScanState))
+ /* target is seqscan or dynamic seqscan */
+ if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) &&
+ (IsA(child, SeqScanState) || IsA(child,
DynamicSeqScanState)))
{
/*
* hashjoin
- * seqscan
- * or
- * hashjoin
- * result
- * seqscan
+ * [result]
+ * seqscan | dynamicseqscan
*/
if (!CheckTargetNode(child, attno, lattno))
return NULL;
diff --git a/src/backend/executor/nodeSeqscan.c
b/src/backend/executor/nodeSeqscan.c
index 94270b8231b..c9debfb1fd0 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -42,8 +42,6 @@
#include "cdb/cdbvars.h"
static TupleTableSlot *SeqNext(SeqScanState *node);
-
-static bool PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot);
static ScanKey ScanKeyListToArray(List *keys, int *num);
/* ----------------------------------------------------------------
@@ -104,7 +102,7 @@ SeqNext(SeqScanState *node)
{
while (table_scan_getnextslot(scandesc, direction, slot))
{
- if (!PassByBloomFilter(node, slot))
+ if (!PassByBloomFilter(&node->ss.ps, node->filters,
slot))
continue;
return slot;
@@ -405,8 +403,8 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
/*
* Returns true if the element may be in the bloom filter.
*/
-static bool
-PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
+bool
+PassByBloomFilter(PlanState *ps, List *filters, TupleTableSlot *slot)
{
ScanKey sk;
Datum val;
@@ -417,12 +415,10 @@ PassByBloomFilter(SeqScanState *node, TupleTableSlot
*slot)
/*
* Mark that the pushdown runtime filter is actually taking effect.
*/
- if (node->ss.ps.instrument &&
- !node->ss.ps.instrument->prf_work &&
- list_length(node->filters))
- node->ss.ps.instrument->prf_work = true;
+ if (ps->instrument && !ps->instrument->prf_work && list_length(filters))
+ ps->instrument->prf_work = true;
- foreach (lc, node->filters)
+ foreach (lc, filters)
{
sk = lfirst(lc);
if (sk->sk_flags != SK_BLOOM_FILTER)
@@ -435,7 +431,7 @@ PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument);
if (bloom_lacks_element(blm_filter, (unsigned char *)&val,
sizeof(Datum)))
{
- InstrCountFilteredPRF(node, 1);
+ InstrCountFilteredPRF(ps, 1);
return false;
}
}
diff --git a/src/include/executor/nodeSeqscan.h
b/src/include/executor/nodeSeqscan.h
index 170286a8d5e..efc712dc4b7 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -30,4 +30,6 @@ extern void ExecSeqScanReInitializeDSM(SeqScanState *node,
ParallelContext *pcxt
extern void ExecSeqScanInitializeWorker(SeqScanState *node,
ParallelWorkerContext *pwcxt);
+extern bool PassByBloomFilter(PlanState *ps, List *filters, TupleTableSlot
*slot);
+
#endif /* NODESEQSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e673dd47736..ede431f9ff3 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2231,6 +2231,9 @@ typedef struct DynamicSeqScanState
struct PartitionPruneState *as_prune_state; /* partition dynamic
pruning state */
Bitmapset *as_valid_subplans; /* used to determine partitions during
dynamic pruning*/
bool did_pruning; /* flag that is set once dynamic pruning
is performed */
+
+ /* runtime filter support */
+ List *filters; /* the list of struct
ScanKeyData for runtime filters */
} DynamicSeqScanState;
/*
diff --git a/src/test/regress/expected/gp_runtime_filter.out
b/src/test/regress/expected/gp_runtime_filter.out
index cc1531a707c..32287b63d01 100644
--- a/src/test/regress/expected/gp_runtime_filter.out
+++ b/src/test/regress/expected/gp_runtime_filter.out
@@ -535,6 +535,60 @@ RESET gp_enable_runtime_filter_pushdown;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
+-- case 7: scan partition table with dynamic scan
+CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2)
(START (1) END (100) EVERY (50));
+CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED;
+INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5);
+INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51);
+ANALYZE;
+SET optimizer TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------
+ Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1)
+ -> Hash Join (actual rows=64 loops=1)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of
524288 buckets.
+ -> Dynamic Seq Scan on t1 (actual rows=608 loops=1)
+ Number of partitions to scan: 2 (out of 2)
+ Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0).
+ -> Hash (actual rows=6 loops=1)
+ Buckets: 524288 Batches: 1 Memory Usage: 4097kB
+ -> Partition Selector (selector id: $0) (actual rows=6 loops=1)
+ -> Seq Scan on t2 (actual rows=6 loops=1)
+ Optimizer: GPORCA
+(12 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------
+ Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1)
+ -> Hash Join (actual rows=64 loops=1)
+ Hash Cond: (t1.c2 = t2.c2)
+ Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of
524288 buckets.
+ -> Dynamic Seq Scan on t1 (actual rows=64 loops=1)
+ Rows Removed by Pushdown Runtime Filter: 544
+ Number of partitions to scan: 2 (out of 2)
+ Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0).
+ -> Hash (actual rows=6 loops=1)
+ Buckets: 524288 Batches: 1 Memory Usage: 4097kB
+ -> Partition Selector (selector id: $0) (actual rows=6 loops=1)
+ -> Seq Scan on t2 (actual rows=6 loops=1)
+ Optimizer: GPORCA
+(13 rows)
+
+RESET gp_enable_runtime_filter_pushdown;
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+SET optimizer TO off;
RESET enable_parallel;
-- Clean up: reset guc
SET gp_enable_runtime_filter TO off;
diff --git a/src/test/regress/sql/gp_runtime_filter.sql
b/src/test/regress/sql/gp_runtime_filter.sql
index fc1fe487745..6a0c6e0cafb 100644
--- a/src/test/regress/sql/gp_runtime_filter.sql
+++ b/src/test/regress/sql/gp_runtime_filter.sql
@@ -227,6 +227,35 @@ DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
DROP TABLE IF EXISTS t3;
+-- case 7: scan partition table with dynamic scan
+CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2)
(START (1) END (100) EVERY (50));
+CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED;
+INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5);
+INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51);
+ANALYZE;
+
+SET optimizer TO on;
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+
+SET gp_enable_runtime_filter_pushdown TO on;
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+
+RESET gp_enable_runtime_filter_pushdown;
+
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+
+SET optimizer TO off;
+
RESET enable_parallel;
-- Clean up: reset guc
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]