This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 142353684b2c697a0705cff489e0aaf54e2dcc61
Author: Chris Hajas <[email protected]>
AuthorDate: Wed Feb 8 09:57:09 2023 -0800

    Fix COPY when executed via fdw on coordinator as executor (#14846)
    
    Previously, when using a foreign table that was created with the file
    fdw under a redistribute and the execution was on the coordinator, the
    query would hang. This happens because we were using the COPY_EXECUTOR
    dispatch mode, which is for receiving pre-processed data from QD.
    Instead, we should be using COPY_DIRECT, which is used with COPY " ON
    SEGMENT running on a segment, or utility mode, or non-distributed table
    in QD."
    
    This fixes plans such as:
    ```
    test=# explain select * from f_table, bar where a=c;                        
                                                                                
                                                                                
                                                     QUERY PLAN
    
---------------------------------------------------------------------------------
     Gather Motion 3:1  (slice1; segments: 3)  (cost=1.04..2.22 rows=3 width=16)
       ->  Hash Join  (cost=1.04..2.17 rows=1 width=16)
             Hash Cond: (f_table.a = bar.c)
             ->  Redistribute Motion 1:3  (slice2)  (cost=0.00..1.12 rows=1 
width=8)
                   Hash Key: f_table.a
                   ->  Foreign Scan on f_table  (cost=0.00..1.10 rows=1 width=8)
                         Foreign File: /tmp/test.csv
                         Foreign File Size: 8 b
             ->  Hash  (cost=1.02..1.02 rows=2 width=8)
                   ->  Seq Scan on bar  (cost=0.00..1.02 rows=2 width=8)
     Optimizer: Postgres query optimizer
    (11 rows)
    ```
---
 contrib/file_fdw/input/gp_file_fdw.source          |  20 +++
 contrib/file_fdw/output/gp_file_fdw.source         |  91 ++++++++++++-
 .../file_fdw/output/gp_file_fdw_optimizer.source   | 145 +++++++++++++++++++++
 src/backend/commands/copyfrom.c                    |   8 ++
 4 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/contrib/file_fdw/input/gp_file_fdw.source 
b/contrib/file_fdw/input/gp_file_fdw.source
index 908dfeab9a..c7548de98c 100644
--- a/contrib/file_fdw/input/gp_file_fdw.source
+++ b/contrib/file_fdw/input/gp_file_fdw.source
@@ -38,8 +38,28 @@ CREATE FOREIGN TABLE text_csv_any_from_server (
 ) SERVER file_server
 OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv');
 SELECT * FROM text_csv_any_from_server;
+CREATE FOREIGN TABLE text_csv_coordinator (
+    word1 text, word2 text, a int, b int
+) SERVER file_server
+OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv', mpp_execute 
'coordinator');
+
+-- Test append works both ways and generates valid plans. Should be able to 
execute
+-- coordinator fdw on coordinator under redistribute
+explain select word1 from text_csv_coordinator union all select word1 from 
text_csv_all;
+select word1 from text_csv_coordinator union all select word1 from 
text_csv_all;
+
+explain select word1 from text_csv_all union all select word1 from 
text_csv_coordinator;
+select word1 from text_csv_all union all select word1 from 
text_csv_coordinator;
+
+-- Test join with foreign scan under redistribute on coordinator works and 
doesn't hang
+create table bar (a text);
+insert into bar values ('AAA'),('XYZ'),('hji');
+analyze bar;
+explain  select word1 from text_csv_coordinator ft1, bar where ft1.word1 = 
bar.a;
+select word1 from text_csv_coordinator ft1, bar where ft1.word1 = bar.a;
 
 -- cleanup
+DROP TABLE bar;
 RESET ROLE;
 DROP EXTENSION file_fdw CASCADE;
 DROP ROLE file_fdw_superuser;
diff --git a/contrib/file_fdw/output/gp_file_fdw.source 
b/contrib/file_fdw/output/gp_file_fdw.source
index 59c2bbcb07..4f8c18f88e 100644
--- a/contrib/file_fdw/output/gp_file_fdw.source
+++ b/contrib/file_fdw/output/gp_file_fdw.source
@@ -43,13 +43,102 @@ CREATE FOREIGN TABLE text_csv_any_from_server (
 OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv');
 SELECT * FROM text_csv_any_from_server;
 ERROR:  file_fdw does not support mpp_execute option 'any'
+CREATE FOREIGN TABLE text_csv_coordinator (
+    word1 text, word2 text, a int, b int
+) SERVER file_server
+OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv', mpp_execute 
'coordinator');
+-- Test append works both ways and generates valid plans. Should be able to 
execute
+-- coordinator fdw on coordinator under redistribute
+explain select word1 from text_csv_coordinator union all select word1 from 
text_csv_all;
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
+ Append  (cost=0.00..550.56 rows=10442 width=32)
+   ->  Foreign Scan on text_csv_coordinator  (cost=0.00..1.15 rows=2 width=32)
+         Foreign File: @abs_srcdir@/data/text.csv
+         Foreign File Size: 86 b
+   ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..497.20 rows=10440 
width=32)
+         ->  Foreign Scan on text_csv_all  (cost=0.00..358.00 rows=3480 
width=32)
+               Foreign File: @abs_srcdir@/data/text<SEGID>.csv
+ Optimizer: Postgres query optimizer
+(8 rows)
+
+select word1 from text_csv_coordinator union all select word1 from 
text_csv_all;
+ word1 
+-------
+ AAA
+ XYZ
+ NULL
+ NULL
+ ABC
+ AAA
+ BBB
+ FOO
+(8 rows)
+
+explain select word1 from text_csv_all union all select word1 from 
text_csv_coordinator;
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
+ Append  (cost=0.00..550.56 rows=10442 width=32)
+   ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..497.20 rows=10440 
width=32)
+         ->  Foreign Scan on text_csv_all  (cost=0.00..358.00 rows=3480 
width=32)
+               Foreign File: @abs_srcdir@/file_fdw/data/text<SEGID>.csv
+   ->  Foreign Scan on text_csv_coordinator  (cost=0.00..1.15 rows=2 width=32)
+         Foreign File: @abs_srcdir@/file_fdw/data/text.csv
+         Foreign File Size: 86 b
+ Optimizer: Postgres query optimizer
+(8 rows)
+
+select word1 from text_csv_all union all select word1 from 
text_csv_coordinator;
+ word1 
+-------
+ AAA
+ BBB
+ FOO
+ AAA
+ XYZ
+ NULL
+ NULL
+ ABC
+(8 rows)
+
+-- Test join with foreign scan under redistribute on coordinator works and 
doesn't hang
+create table bar (a text);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Greenplum Database data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+insert into bar values ('AAA'),('XYZ'),('hji');
+analyze bar;
+explain  select word1 from text_csv_coordinator ft1, bar where ft1.word1 = 
bar.a;
+                                          QUERY PLAN                           
                
+-----------------------------------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)  (cost=1.02..2.26 rows=3 width=32)
+   ->  Hash Join  (cost=1.02..2.22 rows=1 width=32)
+         Hash Cond: (ft1.word1 = bar.a)
+         ->  Redistribute Motion 1:3  (slice2)  (cost=0.00..1.18 rows=1 
width=32)
+               Hash Key: ft1.word1
+               ->  Foreign Scan on text_csv_coordinator ft1  (cost=0.00..1.15 
rows=2 width=32)
+                     Foreign File: @abs_srcdir@/file_fdw/data/text.csv
+                     Foreign File Size: 86 b
+         ->  Hash  (cost=1.01..1.01 rows=1 width=4)
+               ->  Seq Scan on bar  (cost=0.00..1.01 rows=1 width=4)
+ Optimizer: Postgres query optimizer
+(11 rows)
+
+select word1 from text_csv_coordinator ft1, bar where ft1.word1 = bar.a;
+ word1 
+-------
+ AAA
+ XYZ
+(2 rows)
+
 -- cleanup
+DROP TABLE bar;
 RESET ROLE;
 DROP EXTENSION file_fdw CASCADE;
-NOTICE:  drop cascades to 5 other objects
+NOTICE:  drop cascades to 6 other objects
 DETAIL:  drop cascades to server file_server
 drop cascades to user mapping for file_fdw_superuser on server file_server
 drop cascades to foreign table text_csv_any
 drop cascades to foreign table text_csv_all
 drop cascades to foreign table text_csv_any_from_server
+drop cascades to foreign table text_csv_coordinator
 DROP ROLE file_fdw_superuser;
diff --git a/contrib/file_fdw/output/gp_file_fdw_optimizer.source 
b/contrib/file_fdw/output/gp_file_fdw_optimizer.source
new file mode 100644
index 0000000000..7276576c2b
--- /dev/null
+++ b/contrib/file_fdw/output/gp_file_fdw_optimizer.source
@@ -0,0 +1,145 @@
+--
+-- Test foreign-data wrapper file_fdw. Greenplum MPP specific
+--
+-- Clean up in case a prior regression run failed
+SET client_min_messages TO 'error';
+SET optimizer_trace_fallback = on;
+DROP ROLE IF EXISTS file_fdw_superuser, file_fdw_user, no_priv_user;
+RESET client_min_messages;
+CREATE ROLE file_fdw_superuser LOGIN SUPERUSER; -- is a superuser
+-- Install file_fdw
+CREATE EXTENSION file_fdw;
+-- file_fdw_superuser owns fdw-related objects
+SET ROLE file_fdw_superuser;
+CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw OPTIONS (mpp_execute 
'any');
+-- create user mappings and grant privilege to test users
+SET ROLE file_fdw_superuser;
+CREATE USER MAPPING FOR file_fdw_superuser SERVER file_server OPTIONS 
(mpp_execute 'master'); -- error
+ERROR:  invalid option "mpp_execute"
+HINT:  There are no valid options in this context.
+CREATE USER MAPPING FOR file_fdw_superuser SERVER file_server;
+-- MPP tests
+CREATE FOREIGN TABLE text_csv_any (
+    word1 text, word2 text
+) SERVER file_server
+OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv', mpp_execute 
'any');
+SELECT * FROM text_csv_any;
+ERROR:  file_fdw does not support mpp_execute option 'any'
+CREATE FOREIGN TABLE text_csv_all (
+    word1 text, word2 text
+) SERVER file_server
+OPTIONS (format 'csv', filename '@abs_srcdir@/data/text<SEGID>.csv', 
mpp_execute 'all segments');
+SELECT * FROM text_csv_all ORDER BY word1;
+ word1 | word2 
+-------+-------
+ AAA   | aaa
+ BBB   | abc
+ FOO   | bar
+(3 rows)
+
+CREATE FOREIGN TABLE text_csv_any_from_server (
+    word1 text, word2 text
+) SERVER file_server
+OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv');
+SELECT * FROM text_csv_any_from_server;
+ERROR:  file_fdw does not support mpp_execute option 'any'
+CREATE FOREIGN TABLE text_csv_coordinator (
+    word1 text, word2 text, a int, b int
+) SERVER file_server
+OPTIONS (format 'csv', filename '@abs_srcdir@/data/text.csv', mpp_execute 
'coordinator');
+-- Test append works both ways and generates valid plans. Should be able to 
execute
+-- coordinator fdw on coordinator under redistribute
+explain select word1 from text_csv_coordinator union all select word1 from 
text_csv_all;
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
+ Append  (cost=0.00..862.00 rows=1 width=8)
+   ->  Foreign Scan on text_csv_coordinator  (cost=0.00..431.00 rows=1 width=8)
+         Foreign File: @abs_srcdir@/data/text.csv
+         Foreign File Size: 86 b
+   ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..431.00 rows=1 
width=8)
+         ->  Foreign Scan on text_csv_all  (cost=0.00..431.00 rows=1 width=8)
+               Foreign File: @abs_srcdir@/data/text<SEGID>.csv
+ Optimizer: Pivotal Optimizer (GPORCA)
+(8 rows)
+
+select word1 from text_csv_coordinator union all select word1 from 
text_csv_all;
+ word1 
+-------
+ AAA
+ XYZ
+ NULL
+ NULL
+ ABC
+ AAA
+ BBB
+ FOO
+(8 rows)
+
+explain select word1 from text_csv_all union all select word1 from 
text_csv_coordinator;
+                                           QUERY PLAN                          
                 
+------------------------------------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=8)
+   ->  Append  (cost=0.00..862.00 rows=1 width=8)
+         ->  Foreign Scan on text_csv_all  (cost=0.00..431.00 rows=1 width=8)
+               Foreign File: @abs_srcdir@/data/text<SEGID>.csv
+         ->  Redistribute Motion 1:3  (slice2)  (cost=0.00..431.00 rows=1 
width=8)
+               ->  Foreign Scan on text_csv_coordinator  (cost=0.00..431.00 
rows=1 width=8)
+                     Foreign File: @abs_srcdir@/data/text.csv
+                     Foreign File Size: 86 b
+ Optimizer: Pivotal Optimizer (GPORCA)
+(9 rows)
+
+select word1 from text_csv_all union all select word1 from 
text_csv_coordinator;
+ word1 
+-------
+ AAA
+ BBB
+ FOO
+ AAA
+ XYZ
+ NULL
+ NULL
+ ABC
+(8 rows)
+
+-- Test join with foreign scan under redistribute on coordinator works and 
doesn't hang
+create table bar (a text);
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' 
as the Greenplum Database data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+insert into bar values ('AAA'),('XYZ'),('hji');
+analyze bar;
+explain  select word1 from text_csv_coordinator ft1, bar where ft1.word1 = 
bar.a;
+                                          QUERY PLAN                           
                
+-----------------------------------------------------------------------------------------------
+ Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=8)
+   ->  Hash Join  (cost=0.00..862.00 rows=1 width=8)
+         Hash Cond: (word1 = bar.a)
+         ->  Redistribute Motion 1:3  (slice2)  (cost=0.00..431.00 rows=1 
width=8)
+               Hash Key: word1
+               ->  Foreign Scan on text_csv_coordinator  (cost=0.00..431.00 
rows=1 width=8)
+                     Foreign File: @abs_srcdir@/file_fdw/data/text.csv
+                     Foreign File Size: 86 b
+         ->  Hash  (cost=431.00..431.00 rows=1 width=4)
+               ->  Seq Scan on bar  (cost=0.00..431.00 rows=1 width=4)
+ Optimizer: Pivotal Optimizer (GPORCA)
+(11 rows)
+
+select word1 from text_csv_coordinator ft1, bar where ft1.word1 = bar.a;
+ word1 
+-------
+ AAA
+ XYZ
+(2 rows)
+
+-- cleanup
+DROP TABLE bar;
+RESET ROLE;
+DROP EXTENSION file_fdw CASCADE;
+NOTICE:  drop cascades to 6 other objects
+DETAIL:  drop cascades to server file_server
+drop cascades to user mapping for file_fdw_superuser on server file_server
+drop cascades to foreign table text_csv_any
+drop cascades to foreign table text_csv_all
+drop cascades to foreign table text_csv_any_from_server
+drop cascades to foreign table text_csv_coordinator
+DROP ROLE file_fdw_superuser;
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 004f1dd038..712951cb66 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1410,6 +1410,14 @@ BeginCopyFromDirectoryTable(ParseState *pstate,
                cstate->rel && cstate->rel->rd_cdbpolicy &&
                cstate->rel->rd_cdbpolicy->ptype != POLICYTYPE_ENTRY)
                cstate->dispatch_mode = COPY_DISPATCH;
+       /*
+        * Handle case where fdw executes on coordinator while it's acting as a 
segment
+        * This occurs when fdw is under a redistribute on the coordinator
+        */
+       else if (Gp_role == GP_ROLE_EXECUTE &&
+                        cstate->rel && cstate->rel->rd_cdbpolicy &&
+                        cstate->rel->rd_cdbpolicy->ptype == POLICYTYPE_ENTRY)
+               cstate->dispatch_mode = COPY_DIRECT;
        else if (Gp_role == GP_ROLE_EXECUTE)
                cstate->dispatch_mode = COPY_EXECUTOR;
        else


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to