This is an automated email from the ASF dual-hosted git repository.

yjhjstz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit d0cc0909822f303b601e1c4d8d2be3b2fa8d3715
Author: Sasasu <[email protected]>
AuthorDate: Tue Mar 15 16:07:56 2022 +0800

    Dispatch temporary tablespace id to all Gangs.
    
    If a query has multiple Gangs, the secondary Gang cannot get correct 
tmporary tablespace ID.
    
    Consider this SQL:
    
    ```
        create table tn_a(id int) distributed by (id); insert into tn_a values 
(1), (2);
        create temp table tn_a_tmp(a int) distributed replicated; insert into 
tn_a_tmp values(1);
    
        create or replace function fun(sql text, a oid) returns bigint AS '
            return plpy.execute(sql).nrows() + a
        ' language plpython3u stable;
    
        create table tn_a_new as with c as (
            select fun('select * from tn_a_tmp', s.id) from tn_a s
        ) select 1 from c;
    ```
    
    The key is to reader gang to access temp table via text-sql execute, this 
can only happen when
    UDF executing text SQL and the temp table is replicated.
    
    In this scenario happened. the frist `create temp table` will create a
    temporary schema then dispatch to primary QE. both QD and QE has a
    correct myTempNamespaceID. when executing the 2-Gang query. the
    secondary Gang will boot dynamic with myTempNamespaceID = InvalidOid.
    
    In the end, the secondary Gang scaning on temporary table. will throw a
    'table not exist' error with out checking temporary schema.
    
    The method to solve this problem is the same as numsegments in GPDB.
    We re-dispatch the temporary namespace OID in each query. By adding 8byte
    to be sent in the MPP query message. The temporary namespace OID cannot be
    changed after it has been set, same as PG.
    
    Signed-off-by: Sasasu <[email protected]>
---
 src/backend/catalog/namespace.c            | 27 +++++++++++++++++++++-
 src/backend/cdb/dispatcher/cdbdisp_query.c | 18 +++++++++++++--
 src/backend/tcop/postgres.c                | 10 +++++++++
 src/include/catalog/namespace.h            |  2 ++
 src/test/regress/expected/bfv_temp.out     | 35 +++++++++++++++++++++++++++++
 src/test/regress/sql/bfv_temp.sql          | 36 +++++++++++++++++++++++++++++-
 6 files changed, 124 insertions(+), 4 deletions(-)

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 6c5891ce2d..9a3b347aa0 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -3421,6 +3421,8 @@ GetTempToastNamespace(void)
  *
  * This is used for conveying state to a parallel worker, and is not meant
  * for general-purpose access.
+ *
+ * GPDB: also used when dispatch MPP query
  */
 void
 GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
@@ -3460,6 +3462,28 @@ SetTempNamespaceState(Oid tempNamespaceId, Oid 
tempToastNamespaceId)
        baseSearchPathValid = false;    /* may need to rebuild list */
 }
 
+/*
+ * like SetTempNamespaceState, but the process running normally
+ *
+ * GPDB: used to set session level temporary namespace after gang launched.
+ */
+void
+SetTempNamespaceStateAfterBoot(Oid tempNamespaceId, Oid tempToastNamespaceId)
+{
+       /* same as PG, can not switch to other temp namespace dynamically */
+       Assert(myTempNamespace == InvalidOid || myTempNamespace == 
tempNamespaceId);
+       Assert(myTempToastNamespace == InvalidOid || myTempToastNamespace == 
tempToastNamespaceId);
+
+       /* if the namespace OID already setted, baseSearchPath is still valid */
+       if (myTempNamespace == tempToastNamespaceId && myTempToastNamespace == 
tempToastNamespaceId)
+               return;
+
+       myTempNamespace = tempNamespaceId;
+       myTempToastNamespace = tempToastNamespaceId;
+
+       baseSearchPathValid = false;    /* need to rebuild list */
+}
+
 
 /*
  * GetOverrideSearchPath - fetch current search path definition in form
@@ -4353,6 +4377,7 @@ ResetTempNamespace(void)
        cancel_before_shmem_exit_if_matched(RemoveTempRelationsCallback, 0);
 
        myTempNamespace = InvalidOid;
+       myTempToastNamespace = InvalidOid;
        myTempNamespaceSubID = InvalidSubTransactionId;
        baseSearchPathValid = false;    /* need to rebuild list */
 
@@ -4726,7 +4751,7 @@ TempNamespaceValid(bool error_if_removed)
                if (SearchSysCacheExists1(NAMESPACEOID,
                                                                  
ObjectIdGetDatum(myTempNamespace)))
                        return true;
-               else if (Gp_role != GP_ROLE_EXECUTE && error_if_removed) 
+               else if (Gp_role != GP_ROLE_EXECUTE && error_if_removed)
                {
                        /*
                         * We might call this on QEs if we're dropping our own
diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c 
b/src/backend/cdb/dispatcher/cdbdisp_query.c
index 7d35bb3dae..e9eb99d0fb 100644
--- a/src/backend/cdb/dispatcher/cdbdisp_query.c
+++ b/src/backend/cdb/dispatcher/cdbdisp_query.c
@@ -26,6 +26,7 @@
 #include "cdb/cdbmutate.h"
 #include "cdb/cdbsrlz.h"
 #include "cdb/tupleremap.h"
+#include "catalog/namespace.h" /* for GetTempNamespaceState() */
 #include "nodes/execnodes.h"
 #include "pgstat.h"
 #include "tcop/tcopprot.h"
@@ -869,6 +870,7 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
        Oid                     currentUserId = GetUserId();
        int32           numsegments = getgpsegmentCount();
        StringInfoData resgroupInfo;
+       Oid                     tempNamespaceId, tempToastNamespaceId;
 
        int                     tmp,
                                len;
@@ -920,7 +922,10 @@ buildGpQueryString(DispatchCommandQueryParms *pQueryParms,
                sddesc_len +
                sizeof(numsegments) +
                sizeof(resgroupInfo.len) +
-               resgroupInfo.len;
+               resgroupInfo.len +
+               sizeof(tempNamespaceId) +
+               sizeof(tempToastNamespaceId) +
+               0;
 
        shared_query = palloc(total_query_len);
 
@@ -1015,11 +1020,20 @@ buildGpQueryString(DispatchCommandQueryParms 
*pQueryParms,
                pos += resgroupInfo.len;
        }
 
-       len = pos - shared_query - 1;
+       /* in-process variable for temporary namespace */
+       GetTempNamespaceState(&tempNamespaceId, &tempToastNamespaceId);
+       tempNamespaceId = htonl(tempNamespaceId);
+       tempToastNamespaceId = htonl(tempToastNamespaceId);
+
+       memcpy(pos, &tempNamespaceId, sizeof(tempNamespaceId));
+       pos += sizeof(tempNamespaceId);
+       memcpy(pos, &tempToastNamespaceId, sizeof(tempToastNamespaceId));
+       pos += sizeof(tempToastNamespaceId);
 
        /*
         * fill in length placeholder
         */
+       len = pos - shared_query - 1;
        tmp = htonl(len);
        memcpy(shared_query + 1, &tmp, sizeof(len));
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 00068c284c..5c1f6f56ba 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -44,6 +44,7 @@
 #include "access/xact.h"
 #include "catalog/oid_dispatch.h"
 #include "catalog/pg_type.h"
+#include "catalog/namespace.h"
 #include "commands/async.h"
 #include "commands/prepare.h"
 #include "crypto/bufenc.h"
@@ -5712,6 +5713,15 @@ PostgresMain(int argc, char *argv[],
                                        if (resgroupInfoLen > 0)
                                                resgroupInfoBuf = 
pq_getmsgbytes(&input_message, resgroupInfoLen);
 
+                                       /* in-process variable for temporary 
namespace */
+                                       {
+                                               Oid                     
tempNamespaceId, tempToastNamespaceId;
+
+                                               tempNamespaceId = 
pq_getmsgint(&input_message, sizeof(tempNamespaceId));
+                                               tempToastNamespaceId = 
pq_getmsgint(&input_message, sizeof(tempToastNamespaceId));
+                                               
SetTempNamespaceStateAfterBoot(tempNamespaceId, tempToastNamespaceId);
+                                       }
+
                                        pq_getmsgend(&input_message);
 
                                        elog((Debug_print_full_dtm ? LOG : 
DEBUG5), "MPP dispatched stmt from QD: %s.",query_string);
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index 3febca7260..1d8c4340f4 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -167,6 +167,8 @@ extern void GetTempNamespaceState(Oid *tempNamespaceId,
                                                                  Oid 
*tempToastNamespaceId);
 extern void SetTempNamespaceState(Oid tempNamespaceId,
                                                                  Oid 
tempToastNamespaceId);
+extern void SetTempNamespaceStateAfterBoot(Oid tempNamespaceId,
+                                                                 Oid 
tempToastNamespaceId); /* GPDB only */
 extern void ResetTempTableNamespace(void);
 
 extern OverrideSearchPath *GetOverrideSearchPath(MemoryContext context);
diff --git a/src/test/regress/expected/bfv_temp.out 
b/src/test/regress/expected/bfv_temp.out
index a1bfe5d451..989279dc05 100644
--- a/src/test/regress/expected/bfv_temp.out
+++ b/src/test/regress/expected/bfv_temp.out
@@ -64,3 +64,38 @@ reset role;
 drop table temp_nspnames;
 drop function public.sec_definer_create_test();
 drop role sec_definer_role;
+-- Check if myTempNamespace is correct in N-Gang query.
+create table tn_a(id int) distributed by (id);
+create temp table tn_a_tmp(a int) distributed replicated;
+insert into tn_a values (1), (2);
+insert into tn_a_tmp values(1);
+create or replace function fun(sql text, a oid) returns bigint AS 'return 
plpy.execute(sql).nrows() + a' language plpython3u stable;
+create table tn_a_new as with c as (select fun('select * from tn_a_tmp', s.id) 
from tn_a s) select 1 from c;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'?column?' as the Greenplum Database data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+drop table tn_a;
+drop table tn_a_tmp;
+drop table tn_a_new;
+-- Check if old gang can accept new temp schema, after temp schema changed on 
coordinator
+\c
+create table tn_b_a(id int) distributed by (id);
+create table tn_b_b(id int, a_id int) distributed by (id);
+insert into tn_b_a values (1), (2);
+insert into tn_b_b values (3, 1), (4, 2);
+select a.id, b.id from tn_b_a a, tn_b_b b where a.id = b.a_id order by 1, 2;
+ id | id 
+----+----
+  1 |  3
+  2 |  4
+(2 rows)
+
+create temp table tn_b_temp(a int) distributed replicated;
+insert into tn_b_temp values(1);
+create table tn_b_new as with c as (select fun('select * from tn_b_temp', 
s.id) from tn_b_b s) select 1 from c;
+NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 
'?column?' as the Greenplum Database data distribution key for this table.
+HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make 
sure column(s) chosen are the optimal data distribution key to minimize skew.
+drop table tn_b_a;
+drop table tn_b_b;
+drop table tn_b_temp;
+drop table tn_b_new;
+drop function fun(sql text, a oid);
diff --git a/src/test/regress/sql/bfv_temp.sql 
b/src/test/regress/sql/bfv_temp.sql
index c1aa90653c..4df911099d 100644
--- a/src/test/regress/sql/bfv_temp.sql
+++ b/src/test/regress/sql/bfv_temp.sql
@@ -44,9 +44,43 @@ where nsp.nspname = temp_nspnames.nspname OR nsp.nspname = 
temp_nspnames.toastns
 -- gone if the whole namespace is gone, but doesn't hurt to check.)
 select * from pg_tables where tablename = 'wmt_toast_issue_temp';
 
-
 -- Clean up
 reset role;
 drop table temp_nspnames;
 drop function public.sec_definer_create_test();
 drop role sec_definer_role;
+
+-- Check if myTempNamespace is correct in N-Gang query.
+create table tn_a(id int) distributed by (id);
+create temp table tn_a_tmp(a int) distributed replicated;
+
+insert into tn_a values (1), (2);
+insert into tn_a_tmp values(1);
+
+create or replace function fun(sql text, a oid) returns bigint AS 'return 
plpy.execute(sql).nrows() + a' language plpython3u stable;
+
+create table tn_a_new as with c as (select fun('select * from tn_a_tmp', s.id) 
from tn_a s) select 1 from c;
+
+drop table tn_a;
+drop table tn_a_tmp;
+drop table tn_a_new;
+
+-- Check if old gang can accept new temp schema, after temp schema changed on 
coordinator
+\c
+create table tn_b_a(id int) distributed by (id);
+create table tn_b_b(id int, a_id int) distributed by (id);
+
+insert into tn_b_a values (1), (2);
+insert into tn_b_b values (3, 1), (4, 2);
+select a.id, b.id from tn_b_a a, tn_b_b b where a.id = b.a_id order by 1, 2;
+
+create temp table tn_b_temp(a int) distributed replicated;
+insert into tn_b_temp values(1);
+
+create table tn_b_new as with c as (select fun('select * from tn_b_temp', 
s.id) from tn_b_b s) select 1 from c;
+
+drop table tn_b_a;
+drop table tn_b_b;
+drop table tn_b_temp;
+drop table tn_b_new;
+drop function fun(sql text, a oid);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to