This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c41d270a2e Add runtime filter pushdown support for DynamicSeqscan 
operator
6c41d270a2e is described below

commit 6c41d270a2ec968eadf011401ba1e5d12a5ab71f
Author: zhangyue <[email protected]>
AuthorDate: Wed Jul 2 16:22:29 2025 +0800

    Add runtime filter pushdown support for DynamicSeqscan operator
    
    - Extend runtime filter pushdown to DynamicSeqscan;
    - Update related executor nodes and headers;
    - Update regression tests
---
 src/backend/commands/explain.c                  | 21 +++++-----
 src/backend/executor/nodeDynamicSeqscan.c       |  9 +++++
 src/backend/executor/nodeHash.c                 | 36 +++++++++++++++--
 src/backend/executor/nodeHashjoin.c             | 16 ++++----
 src/backend/executor/nodeSeqscan.c              | 18 ++++-----
 src/include/executor/nodeSeqscan.h              |  2 +
 src/include/nodes/execnodes.h                   |  3 ++
 src/test/regress/expected/gp_runtime_filter.out | 54 +++++++++++++++++++++++++
 src/test/regress/sql/gp_runtime_filter.sql      | 29 +++++++++++++
 9 files changed, 156 insertions(+), 32 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index e87fb30e7c4..0d63d374128 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -140,8 +140,7 @@ static void show_incremental_sort_info(IncrementalSortState 
*incrsortstate,
 static void show_hash_info(HashState *hashstate, ExplainState *es);
 static void show_runtime_filter_info(RuntimeFilterState *rfstate,
                                                                         
ExplainState *es);
-static void show_pushdown_runtime_filter_info(const char *qlabel,
-                                                                               
          PlanState *planstate,
+static void show_pushdown_runtime_filter_info(PlanState *planstate,
                                                                                
          ExplainState *es);
 static void show_memoize_info(MemoizeState *mstate, List *ancestors,
                                                          ExplainState *es);
@@ -2500,8 +2499,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
                        /* FALLTHROUGH */
                case T_SeqScan:
                        if (gp_enable_runtime_filter_pushdown && IsA(planstate, 
SeqScanState))
-                               show_pushdown_runtime_filter_info("Rows Removed 
by Pushdown Runtime Filter",
-                                                                               
                  planstate, es);
+                               show_pushdown_runtime_filter_info(planstate, 
es);
                        /* FALLTHROUGH */
                case T_DynamicSeqScan:
                case T_ValuesScan:
@@ -2510,6 +2508,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
                case T_WorkTableScan:
                case T_SubqueryScan:
                        if (IsA(plan, DynamicSeqScan)) {
+                               if (gp_enable_runtime_filter_pushdown)
+                                       
show_pushdown_runtime_filter_info(planstate, es);
+
                                char *buf;
                                Oid relid;
                                relid = rt_fetch(((DynamicSeqScan *)plan)
@@ -4417,17 +4418,19 @@ show_instrumentation_count(const char *qlabel, int 
which,
  * runtime filter.
  */
 static void
-show_pushdown_runtime_filter_info(const char *qlabel,
-                                                                 PlanState 
*planstate,
-                                                                 ExplainState 
*es)
+show_pushdown_runtime_filter_info(PlanState *planstate, ExplainState *es)
 {
-       Assert(gp_enable_runtime_filter_pushdown && IsA(planstate, 
SeqScanState));
+       Assert(gp_enable_runtime_filter_pushdown && 
+                  (IsA(planstate, SeqScanState) || IsA(planstate, 
DynamicSeqScanState)));
 
        if (!es->analyze || !planstate->instrument)
                return;
 
        if (planstate->instrument->prf_work)
-               ExplainPropertyFloat(qlabel, NULL, 
planstate->instrument->nfilteredPRF, 0, es);
+       {
+               ExplainPropertyFloat("Rows Removed by Pushdown Runtime Filter",
+                                                        NULL, 
planstate->instrument->nfilteredPRF, 0, es);
+       }
 }
 
 /*
diff --git a/src/backend/executor/nodeDynamicSeqscan.c 
b/src/backend/executor/nodeDynamicSeqscan.c
index c13be0a1f1b..3380b907ba3 100644
--- a/src/backend/executor/nodeDynamicSeqscan.c
+++ b/src/backend/executor/nodeDynamicSeqscan.c
@@ -223,7 +223,16 @@ ExecDynamicSeqScan(PlanState *pstate)
                slot = ExecProcNode(&node->seqScanState->ss.ps);
 
                if (!TupIsNull(slot))
+               {
+                       if (gp_enable_runtime_filter_pushdown
+                               && !pstate->state->useMppParallelMode
+                               && node->filters)
+                       {
+                               if (!PassByBloomFilter(&node->ss.ps, 
node->filters, slot))
+                                       continue;
+                       }
                        break;
+               }
 
                /* No more tuples from this partition. Move to next one. */
                CleanupOnePartition(node);
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 53e55e26f91..62d3c2da790 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -4161,7 +4161,9 @@ PushdownRuntimeFilter(HashState *node)
                scankeys = NIL;
 
                attr_filter = lfirst(lc);
-               if (!IsA(attr_filter->target, SeqScanState) || 
attr_filter->empty)
+               if (attr_filter->empty ||
+                       (!IsA(attr_filter->target, SeqScanState) &&
+                        !IsA(attr_filter->target, DynamicSeqScanState)))
                        continue;
 
                /* bloom filter */
@@ -4190,8 +4192,21 @@ PushdownRuntimeFilter(HashState *node)
                scankeys = lappend(scankeys, sk);
 
                /* append new runtime filters to target node */
-               SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
-               sss->filters = list_concat(sss->filters, scankeys);
+               if (IsA(attr_filter->target, SeqScanState))
+               {
+                       SeqScanState *sss = castNode(SeqScanState, 
attr_filter->target);
+                       sss->filters = list_concat(sss->filters, scankeys);
+               }
+               else if (IsA(attr_filter->target, DynamicSeqScanState))
+               {
+                       DynamicSeqScanState *dsss = 
castNode(DynamicSeqScanState, attr_filter->target);
+                       dsss->filters = list_concat(dsss->filters, scankeys);
+               }
+               else
+               {
+                       /* never reach here */
+                       pg_unreachable();
+               }
        }
 }
 
@@ -4250,6 +4265,7 @@ ResetRuntimeFilter(HashState *node)
        ListCell                *lc;
        AttrFilter              *attr_filter;
        SeqScanState    *sss;
+       DynamicSeqScanState *dsss;
 
        if (!node->filters)
                return;
@@ -4268,6 +4284,20 @@ ResetRuntimeFilter(HashState *node)
                                sss->filters = NIL;
                        }
                }
+               else if (IsA(attr_filter->target, DynamicSeqScanState))
+               {
+                       dsss = castNode(DynamicSeqScanState, 
attr_filter->target);
+                       if (dsss->filters)
+                       {
+                               list_free_deep(dsss->filters);
+                               dsss->filters = NIL;
+                       }
+               }
+               else
+               {
+                       /* never reach here */
+                       pg_unreachable();
+               }
 
                if (attr_filter->blm_filter)
                        bloom_free(attr_filter->blm_filter);
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index 2a60c777184..b14446dc953 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -2239,7 +2239,7 @@ CreateRuntimeFilter(HashJoinState* hjstate)
                foreach(lc, targets)
                {
                        PlanState *target = lfirst(lc);
-                       Assert(IsA(target, SeqScanState));
+                       Assert(IsA(target, SeqScanState) || IsA(target, 
DynamicSeqScanState));
 
                        attr_filter = CreateAttrFilter(target, lattno, rattno,
                                        hstate->ps.plan->plan_rows);
@@ -2334,7 +2334,7 @@ CheckTargetNode(PlanState *node, AttrNumber attno, 
AttrNumber *lattno)
        Var *var;
        TargetEntry *te;
 
-       if (!IsA(node, SeqScanState))
+       if (!IsA(node, SeqScanState) && !IsA(node, DynamicSeqScanState))
                return false;
 
        te = (TargetEntry *)list_nth(node->plan->targetlist, attno - 1);
@@ -2375,16 +2375,14 @@ FindTargetNodes(HashJoinState *hjstate, AttrNumber 
attno, AttrNumber *lattno)
        targetNodes = NIL;
        while (true)
        {
-               /* target is seqscan */
-               if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) && 
IsA(child, SeqScanState))
+               /* target is seqscan or dynamic seqscan */
+               if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) && 
+                       (IsA(child, SeqScanState) || IsA(child, 
DynamicSeqScanState)))
                {
                        /*
                         * hashjoin
-                        *   seqscan
-                        * or
-                        * hashjoin
-                        *   result
-                        *     seqscan
+                        *   [result]
+                        *     seqscan | dynamicseqscan
                         */
                        if (!CheckTargetNode(child, attno, lattno))
                                return NULL;
diff --git a/src/backend/executor/nodeSeqscan.c 
b/src/backend/executor/nodeSeqscan.c
index 94270b8231b..c9debfb1fd0 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -42,8 +42,6 @@
 #include "cdb/cdbvars.h"
 
 static TupleTableSlot *SeqNext(SeqScanState *node);
-
-static bool PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot);
 static ScanKey ScanKeyListToArray(List *keys, int *num);
 
 /* ----------------------------------------------------------------
@@ -104,7 +102,7 @@ SeqNext(SeqScanState *node)
        {
                while (table_scan_getnextslot(scandesc, direction, slot))
                {
-                       if (!PassByBloomFilter(node, slot))
+                       if (!PassByBloomFilter(&node->ss.ps, node->filters, 
slot))
                                continue;
 
                        return slot;
@@ -405,8 +403,8 @@ ExecSeqScanInitializeWorker(SeqScanState *node,
 /*
  * Returns true if the element may be in the bloom filter.
  */
-static bool
-PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
+bool
+PassByBloomFilter(PlanState *ps, List *filters, TupleTableSlot *slot)
 {
        ScanKey sk;
        Datum   val;
@@ -417,12 +415,10 @@ PassByBloomFilter(SeqScanState *node, TupleTableSlot 
*slot)
        /*
         * Mark that the pushdown runtime filter is actually taking effect.
         */
-       if (node->ss.ps.instrument &&
-               !node->ss.ps.instrument->prf_work &&
-               list_length(node->filters))
-               node->ss.ps.instrument->prf_work = true;
+       if (ps->instrument && !ps->instrument->prf_work && list_length(filters))
+               ps->instrument->prf_work = true;
 
-       foreach (lc, node->filters)
+       foreach (lc, filters)
        {
                sk = lfirst(lc);
                if (sk->sk_flags != SK_BLOOM_FILTER)
@@ -435,7 +431,7 @@ PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot)
                blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument);
                if (bloom_lacks_element(blm_filter, (unsigned char *)&val, 
sizeof(Datum)))
                {
-                       InstrCountFilteredPRF(node, 1);
+                       InstrCountFilteredPRF(ps, 1);
                        return false;
                }
        }
diff --git a/src/include/executor/nodeSeqscan.h 
b/src/include/executor/nodeSeqscan.h
index 170286a8d5e..efc712dc4b7 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -30,4 +30,6 @@ extern void ExecSeqScanReInitializeDSM(SeqScanState *node, 
ParallelContext *pcxt
 extern void ExecSeqScanInitializeWorker(SeqScanState *node,
                                                                                
ParallelWorkerContext *pwcxt);
 
+extern bool PassByBloomFilter(PlanState *ps, List *filters, TupleTableSlot 
*slot);
+
 #endif                                                 /* NODESEQSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e673dd47736..ede431f9ff3 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2231,6 +2231,9 @@ typedef struct DynamicSeqScanState
        struct PartitionPruneState *as_prune_state; /* partition dynamic 
pruning state */
        Bitmapset  *as_valid_subplans; /* used to determine partitions during 
dynamic pruning*/
        bool            did_pruning; /* flag that is set once dynamic pruning 
is performed */
+
+       /* runtime filter support */
+       List            *filters;                       /* the list of struct 
ScanKeyData for runtime filters */
 } DynamicSeqScanState;
 
 /*
diff --git a/src/test/regress/expected/gp_runtime_filter.out 
b/src/test/regress/expected/gp_runtime_filter.out
index cc1531a707c..32287b63d01 100644
--- a/src/test/regress/expected/gp_runtime_filter.out
+++ b/src/test/regress/expected/gp_runtime_filter.out
@@ -535,6 +535,60 @@ RESET gp_enable_runtime_filter_pushdown;
 DROP TABLE IF EXISTS t1;
 DROP TABLE IF EXISTS t2;
 DROP TABLE IF EXISTS t3;
+-- case 7: scan partition table with dynamic scan
+CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) 
(START (1) END (100) EVERY (50));
+CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED;
+INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5);
+INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51);
+ANALYZE;
+SET optimizer TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+                                        QUERY PLAN                             
            
+-------------------------------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=96 loops=1)
+   ->  Hash Join (actual rows=64 loops=1)
+         Hash Cond: (t1.c2 = t2.c2)
+         Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 6 of 
524288 buckets.
+         ->  Dynamic Seq Scan on t1 (actual rows=608 loops=1)
+               Number of partitions to scan: 2 (out of 2)
+               Partitions scanned:  Avg 2.0 x 3 workers.  Max 2 parts (seg0).
+         ->  Hash (actual rows=6 loops=1)
+               Buckets: 524288  Batches: 1  Memory Usage: 4097kB
+               ->  Partition Selector (selector id: $0) (actual rows=6 loops=1)
+                     ->  Seq Scan on t2 (actual rows=6 loops=1)
+ Optimizer: GPORCA
+(12 rows)
+
+SET gp_enable_runtime_filter_pushdown TO on;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+                                        QUERY PLAN                             
            
+-------------------------------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3) (actual rows=96 loops=1)
+   ->  Hash Join (actual rows=64 loops=1)
+         Hash Cond: (t1.c2 = t2.c2)
+         Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 6 of 
524288 buckets.
+         ->  Dynamic Seq Scan on t1 (actual rows=64 loops=1)
+               Rows Removed by Pushdown Runtime Filter: 544
+               Number of partitions to scan: 2 (out of 2)
+               Partitions scanned:  Avg 2.0 x 3 workers.  Max 2 parts (seg0).
+         ->  Hash (actual rows=6 loops=1)
+               Buckets: 524288  Batches: 1  Memory Usage: 4097kB
+               ->  Partition Selector (selector id: $0) (actual rows=6 loops=1)
+                     ->  Seq Scan on t2 (actual rows=6 loops=1)
+ Optimizer: GPORCA
+(13 rows)
+
+RESET gp_enable_runtime_filter_pushdown;
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+SET optimizer TO off;
 RESET enable_parallel;
 -- Clean up: reset guc
 SET gp_enable_runtime_filter TO off;
diff --git a/src/test/regress/sql/gp_runtime_filter.sql 
b/src/test/regress/sql/gp_runtime_filter.sql
index fc1fe487745..6a0c6e0cafb 100644
--- a/src/test/regress/sql/gp_runtime_filter.sql
+++ b/src/test/regress/sql/gp_runtime_filter.sql
@@ -227,6 +227,35 @@ DROP TABLE IF EXISTS t1;
 DROP TABLE IF EXISTS t2;
 DROP TABLE IF EXISTS t3;
 
+-- case 7: scan partition table with dynamic scan
+CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) 
(START (1) END (100) EVERY (50));
+CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED;
+INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99);
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t1 SELECT * FROM t1;
+INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5);
+INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51);
+ANALYZE;
+
+SET optimizer TO on;
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+
+SET gp_enable_runtime_filter_pushdown TO on;
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2;
+
+RESET gp_enable_runtime_filter_pushdown;
+
+DROP TABLE IF EXISTS t1;
+DROP TABLE IF EXISTS t2;
+
+SET optimizer TO off;
+
 RESET enable_parallel;
 
 -- Clean up: reset guc


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to