This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit 7947f7efbf483d03871bff2fd0edabdc9e7de602
Author: ztao1987 <zhenglin.ta...@gmail.com>
AuthorDate: Mon Dec 13 22:18:51 2021 +0800

    HAWQ-1811. Sync with OushuDB - Phase II
---
 contrib/hornet/orc_debug_statistics.py             |  11 +-
 contrib/magma/magma.c                              | 153 ++++++++++++--------
 src/backend/access/common/reloptions.c             |   1 +
 src/backend/access/transam/xact.c                  | 139 ++++++++++++------
 src/backend/catalog/aoseg.c                        | 158 ++++++++++++++++++++-
 src/backend/catalog/heap.c                         | 121 ++++++++--------
 src/backend/catalog/index.c                        |  17 ++-
 src/backend/catalog/pg_compression.c               |   3 +-
 src/backend/cdb/cdbdatalocality.c                  |  16 ++-
 src/backend/cdb/cdbquerycontextdispatching.c       |   4 +-
 src/backend/cdb/dispatcher.c                       |   3 +
 src/backend/cdb/dispatcher_new.c                   |   3 +
 src/backend/cdb/motion/ic_udp.c                    |  39 +++--
 src/backend/commands/analyze.c                     |   6 +-
 src/backend/commands/copy.c                        |   7 +-
 src/backend/commands/dbcommands.c                  |   5 +-
 src/backend/commands/indexcmds.c                   |  36 ++---
 src/backend/commands/tablecmds.c                   |   5 +-
 src/backend/executor/execDML.c                     |   6 +-
 src/backend/executor/execMain.c                    |   2 +-
 src/backend/executor/nodeExternalscan.c            |   2 +-
 src/backend/optimizer/path/allpaths.c              |   7 +-
 src/backend/optimizer/plan/newPlanner.c            | 121 +++++++++++++---
 src/backend/parser/analyze.c                       |   5 +
 src/backend/storage/buffer/bufmgr.c                |   1 +
 src/backend/tcop/utility.c                         | 143 +++++++++----------
 src/backend/utils/adt/dbsize.c                     |   2 +-
 src/backend/utils/gp/segadmin.c                    |   9 +-
 src/backend/utils/misc/guc.c                       |  10 +-
 src/include/access/orcsegfiles.h                   |   5 +
 src/include/access/xact.h                          |  21 ++-
 src/include/catalog/aoseg.h                        |   1 +
 src/include/cdb/ml_ipc.h                           |   2 +
 .../cwrapper/magma/cwrapper/magma-client-c.h       |  30 +++-
 .../cwrapper/univplan/cwrapper/univplan-c.h        |   3 +-
 src/include/optimizer/newPlanner.h                 |   1 +
 src/include/tcop/utility.h                         |   2 +
 37 files changed, 755 insertions(+), 345 deletions(-)

diff --git a/contrib/hornet/orc_debug_statistics.py 
b/contrib/hornet/orc_debug_statistics.py
index 3ddfb0d..eb61af6 100755
--- a/contrib/hornet/orc_debug_statistics.py
+++ b/contrib/hornet/orc_debug_statistics.py
@@ -81,10 +81,15 @@ col_l = len(col_name)
 for i in range(0,col_l):
     str_ans = str_ans.replace(' Column ' + str(i+1) + ' ',col_name[i])
 
-str_ans = str_ans.replace(',}}}',"}}]}")
+if stripe_num == 0:
+    str_ans = str_ans.replace(',}}}',"}}]}")
+else:
+    str_ans = str_ans.replace(',}}}',"}}}]}")
 str_ans = str_ans.replace('" Stripe 0 ":','"Stripes":[')
-for i in range(1,stripe_num):
-    str_ans = str_ans.replace('" Stripe {} "'.format(i),'},{"Stripe 
{}"'.format(i))
+for i in range(1,2):
+    str_ans = str_ans.replace('" Stripe {} "'.format(i),'}},{{"Stripe 
{}"'.format(i))
+for i in range(2,stripe_num+1):
+    str_ans = str_ans.replace('" Stripe {} "'.format(i),'}}}},{{"Stripe 
{}"'.format(i))
 file_path = '"File_path":"' + sys.argv[2] + '",'
 pre_json = str_ans[0] + file_path + str_ans[1:len(str_ans)]
 print(sys.argv[1] + '|' + pre_json.replace(',}','}'))
diff --git a/contrib/magma/magma.c b/contrib/magma/magma.c
index 08bbf40..b7d6758 100644
--- a/contrib/magma/magma.c
+++ b/contrib/magma/magma.c
@@ -405,7 +405,8 @@ Datum magma_protocol_blocklocation(PG_FUNCTION_ARGS) {
          fmt_name);
   }
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
-  MagmaTablePtr table = MagmaClientC_FetchTable(client, snapshot, 
useClientCacheDirectly);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaTablePtr table = MagmaClientC_FetchTable(client, 
useClientCacheDirectly);
   magma_check_result(&client);
 
   elog(LOG, "magma_protocol_blocklocation pass fetch table");
@@ -527,10 +528,11 @@ Datum magma_protocol_tablesize(PG_FUNCTION_ARGS) {
          fmt_name);
   }
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
+  MagmaClientC_SetupSnapshot(client, snapshot);
 
   // set size of table in tp type to zero.
   if (tableType == MAGMACLIENTC_TABLETYPE_AP) {
-    tsdata->tablesize = MagmaClientC_GetTableSize(client, snapshot);
+    tsdata->tablesize = MagmaClientC_GetTableSize(client);
   } else {
     tsdata->tablesize = 0;
   }
@@ -583,7 +585,8 @@ Datum magma_protocol_databasesize(PG_FUNCTION_ARGS) {
   }
 
   MagmaClientC_SetupDatabaseInfo(client, dbname);
-  dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client, snapshot);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client);
   elog(LOG,"dbsize in magma.c is %llu", dbsdata->dbsize);
   magma_check_result(&client);
 
@@ -837,7 +840,8 @@ Datum magma_createindex(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
-  MagmaClientC_CreateIndex(client, snapshot, magmaidx);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_CreateIndex(client, magmaidx);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -861,7 +865,8 @@ Datum magma_dropindex(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
-  MagmaClientC_DropIndex(client, snapshot, indexname);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_DropIndex(client, indexname);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -885,7 +890,8 @@ Datum magma_reindex_index(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
-  MagmaClientC_Reindex(client, snapshot, indexname);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_Reindex(client, indexname);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -1029,7 +1035,8 @@ Datum magma_createtable(PG_FUNCTION_ARGS) {
   }
 
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
-  MagmaClientC_CreateTable(client, snapshot, ncols, cols);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_CreateTable(client, ncols, cols);
   magma_check_result(&client);
   pfree(cols);
   list_free(pk_names);
@@ -1061,7 +1068,8 @@ Datum magma_droptable(PG_FUNCTION_ARGS) {
   int16_t tableType = 0;
   // for drop table, tableType won't be used in the process, set it as default
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, 
tableType);
-  MagmaClientC_DropTable(client, snapshot);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_DropTable(client);
   magma_check_result(&client);
 
   PG_RETURN_VOID();
@@ -3099,80 +3107,111 @@ Datum magma_transaction(PG_FUNCTION_ARGS) {
     elog(ERROR, "failed to connect to magma service");
   }
 
+  MagmaClientC_SetupSnapshot(client, pst->pst_transaction_snapshot);
+
   switch (txn_command) {
-    case PS_TXN_CMD_BEGIN: {
-      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
-      MagmaTableFullName *magmaTableFullNames = (MagmaTableFullName *) 
palloc0(magmaTableFullNamesSize * sizeof(MagmaTableFullName));
-      int i = 0;
-      ListCell *lc;
-      foreach (lc, ps->magma_talbe_full_names) {
-        MagmaTableFullName* mtfn = lfirst(lc);
-        magmaTableFullNames[i].databaseName = pstrdup(mtfn->databaseName);
-        magmaTableFullNames[i].schemaName = pstrdup(mtfn->schemaName);
-        magmaTableFullNames[i].tableName = pstrdup(mtfn->tableName);
-        ++i;
-      }
-      pst->pst_transaction_dist =
-          MagmaClientC_BeginTransaction(client, magmaTableFullNames, 
magmaTableFullNamesSize);
-      for (int i = 0; i < magmaTableFullNamesSize; ++i) {
-        pfree(magmaTableFullNames[i].databaseName);
-        pfree(magmaTableFullNames[i].schemaName);
-        pfree(magmaTableFullNames[i].tableName);
-      }
-      pfree(magmaTableFullNames);
-      if (pst->pst_transaction_dist == NULL) {
-        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
-        pst->pst_transaction_id = InvalidTransactionId;
-        pst->pst_transaction_dist = NULL;
-        elog(DEBUG1, "magma_transaction: begin snapshot: NULL");
-      } else {
-        elog(DEBUG1, "magma_transaction: begin snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
-      }
+    case PS_TXN_CMD_START_TRANSACTION: {
+      pst->pst_transaction_state = MagmaClientC_StartTransaction(client);
+      pst->pst_transaction_snapshot = NULL;
+      pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+      pst->pst_transaction_id = InvalidTransactionId;
+      pst->pst_transaction_snapshot = NULL;
+      elog(DEBUG1, "magma_transaction: start transaction");
       magma_check_result(&client);
       break;
     }
-    case PS_TXN_CMD_COMMIT:
-      if (pst->pst_transaction_dist == NULL) {
+    case PS_TXN_CMD_COMMIT_TRANSACTION:
+      if (pst->pst_transaction_snapshot == NULL) {
         elog(DEBUG1, "magma_transaction: commit snapshot: NULL");
       } else {
         elog(DEBUG1,
              "magma_transaction: commit snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
       }
 
-      MagmaClientC_CommitTransaction(client, pst->pst_transaction_dist);
+      MagmaClientC_CommitTransaction(client);
       magma_check_result(&client);
       break;
-    case PS_TXN_CMD_ABORT:
-      if (pst->pst_transaction_dist == NULL) {
+    case PS_TXN_CMD_ABORT_TRANSACTION:
+      if (pst->pst_transaction_snapshot == NULL) {
         elog(DEBUG1, "magma_transaction: abort snapshot: NULL");
       } else {
         elog(DEBUG1,
              "magma_transaction: abort snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
       }
 
       if (pst->pst_transaction_status != PS_TXN_STS_DEFAULT &&
           pst->pst_transaction_id != InvalidTransactionId &&
-          pst->pst_transaction_dist != NULL) {
-        MagmaClientC_AbortTransaction(client, pst->pst_transaction_dist,
-                                      PlugStorageGetIsCleanupAbort());
-        pst->pst_transaction_dist = NULL;
+          pst->pst_transaction_snapshot != NULL) {
+        MagmaClientC_AbortTransaction(client, PlugStorageGetIsCleanupAbort());
+        pst->pst_transaction_snapshot = NULL;
         pst->pst_transaction_id = InvalidTransactionId;
         pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
         magma_check_result(&client);
       }
       break;
+    case PS_TXN_CMD_GET_SNAPSHOT: {
+      MagmaClientC_CleanupTableInfo(client);
+      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+      int i = 0;
+      ListCell *lc;
+      foreach (lc, ps->magma_talbe_full_names) {
+        MagmaTableFullName* mtfn = lfirst(lc);
+        MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+                                  mtfn->tableName, 0);
+        ++i;
+      }
+      pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+      if (pst->pst_transaction_snapshot == NULL) {
+        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+        pst->pst_transaction_id = InvalidTransactionId;
+        pst->pst_transaction_snapshot = NULL;
+        elog(DEBUG1, "magma_transaction: get snapshot: NULL");
+      } else {
+        elog(DEBUG1, "magma_transaction: get snapshot: (%llu, %u, %llu, %u)",
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
+      }
+      magma_check_result(&client);
+      break;
+    }
+    case PS_TXN_CMD_GET_TRANSACTIONID: {
+      MagmaClientC_CleanupTableInfo(client);
+      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+      int i = 0;
+      ListCell *lc;
+      foreach (lc, ps->magma_talbe_full_names) {
+        MagmaTableFullName* mtfn = lfirst(lc);
+        MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+                                  mtfn->tableName, 0);
+        ++i;
+      }
+      pst->pst_transaction_state = MagmaClientC_GetTransctionId(client);
+      pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+      if (pst->pst_transaction_snapshot == NULL) {
+        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+        pst->pst_transaction_id = InvalidTransactionId;
+        pst->pst_transaction_snapshot = NULL;
+        elog(DEBUG1, "magma_transaction: get transaction state: NULL");
+      } else {
+        elog(DEBUG1, "magma_transaction: get transaction state: (%llu, %u, 
%llu, %u)",
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
+      }
+      magma_check_result(&client);
+      break;
+    }
     default:
       elog(ERROR, "Transaction command for magma is invalid %d", txn_command);
       break;
diff --git a/src/backend/access/common/reloptions.c 
b/src/backend/access/common/reloptions.c
index 976fc71..c0f9236 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -545,6 +545,7 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
                        // XXX(changyong): The default zlib compression level 
of ORC table is Z_DEFAULT_COMPRESSION,
                        // and this is different from hive of which default 
compression level is (Z_BEST_SPEED + 1).
                        && (strcmp(compresstype, "zlib") != 0)
+                       && (strcmp(compresstype, "zstd") != 0)
                        && (strcmp(compresstype, "none") != 0))
     {
       ereport(ERROR,
diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index 393852d..3762036 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -270,7 +270,7 @@ static PlugStorageTransactionData 
TopPlugStorageTransactionData = {
        .pst_transaction_id      = InvalidTransactionId,/* transaction id */
        .pst_transaction_status  = PS_TXN_STS_DEFAULT,  /* transaction status */
        .pst_transaction_command = PS_TXN_CMD_INVALID,  /* transaction command 
*/
-       .pst_transaction_dist    = NULL                 /* magma transaction 
info */
+       .pst_transaction_snapshot = NULL                 /* magma transaction 
info */
 };
 
 static PlugStorageTransaction TopPlugStorageTransaction = 
&TopPlugStorageTransactionData;
@@ -372,12 +372,6 @@ PlugStorageGetTransactionStatus(void)
        return TopPlugStorageTransaction->pst_transaction_status;
 }
 
-MagmaSnapshot *
-PlugStorageGetTransactionSnapshot(void)
-{
-       return TopPlugStorageTransaction->pst_transaction_dist;
-}
-
 void PlugStorageSetIsCleanupAbort(bool isCleanup)
 {
   isCleanupAbortTransaction = isCleanup;
@@ -397,52 +391,54 @@ extern void 
PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot)
         */
        if (Gp_role == GP_ROLE_DISPATCH)
        {
-           Insist(TopPlugStorageTransaction->pst_transaction_dist != NULL);
+           Insist(TopPlugStorageTransaction->pst_transaction_snapshot != NULL);
        }
        else if (Gp_role == GP_ROLE_EXECUTE &&
-                TopPlugStorageTransaction->pst_transaction_dist == NULL)
+                TopPlugStorageTransaction->pst_transaction_snapshot == NULL)
        {
-           TopPlugStorageTransaction->pst_transaction_dist = 
malloc(sizeof(MagmaSnapshot));
-           memset(TopPlugStorageTransaction->pst_transaction_dist, 0, 
sizeof(MagmaSnapshot));
-           
TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId = 0;
-           
TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus = 
0;
-           
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset
 = 0;
-           
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions = NULL;
-           
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize = 0;
+           TopPlugStorageTransaction->pst_transaction_snapshot = 
malloc(sizeof(MagmaSnapshot));
+           memset(TopPlugStorageTransaction->pst_transaction_snapshot, 0, 
sizeof(MagmaSnapshot));
+           TopPlugStorageTransaction->pst_transaction_snapshot
+                ->currentTransaction.txnId = 0;
+           TopPlugStorageTransaction->pst_transaction_snapshot
+                ->currentTransaction.txnStatus = 0;
+           
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset
 = 0;
+           
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions = 
NULL;
+           
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize = 
0;
        }
 
        // set current transaction for current snapshot
-       
TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId =
+       
TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnId =
                snapshot->currentTransaction.txnId;
-       
TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus =
+       
TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnStatus
 =
                snapshot->currentTransaction.txnStatus;
 
        // set command id
-       TopPlugStorageTransaction->pst_transaction_dist->cmdIdInTransaction =
+       TopPlugStorageTransaction->pst_transaction_snapshot
+            ->cmdIdInTransaction =
                snapshot->cmdIdInTransaction;
 
        // reallocate memory for visible HLC map for current snapshot
-       
free(TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions);
+       
free(TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions);
 
-       
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset
 =
+       
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset
 =
            snapshot->txnActions.txnActionStartOffset;
-       TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions =
+       
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions =
            malloc(sizeof(MagmaTxnAction) *snapshot->txnActions.txnActionSize);
 
        // set visible HLC map for current snapshot
-       
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize =
+       
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize =
            snapshot->txnActions.txnActionSize;
        for (int i = 0; i < snapshot->txnActions.txnActionSize; ++i)
        {
-           
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnId 
=
+           
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnId
 =
                    snapshot->txnActions.txnActions[i].txnId;
-           
TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnStatus
 =
+           
TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnStatus
 =
                    snapshot->txnActions.txnActions[i].txnStatus;
     }
 }
 
-void
-PlugStorageBeginTransaction(List* magmaTableFullNames)
+void PlugStorageStartTransaction(List* magmaTableFullNames)
 {
        if ((Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode())
        {
@@ -460,13 +456,9 @@ PlugStorageBeginTransaction(List* magmaTableFullNames)
                        }
 
                        Assert(pst->pst_transaction_status == 
PS_TXN_STS_DEFAULT);
-                       pst->pst_transaction_command = PS_TXN_CMD_BEGIN;
+                       pst->pst_transaction_command =
+                            PS_TXN_CMD_START_TRANSACTION;
                        InvokePlugStorageFormatTransaction(pst, 
magmaTableFullNames);
-                       elog(DEBUG1, "PS TXN: BEGIN (%llu, %u, %llu, %u)",
-                            
pst->pst_transaction_dist->currentTransaction.txnId,
-                            
pst->pst_transaction_dist->currentTransaction.txnStatus,
-                            
pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-                            
pst->pst_transaction_dist->txnActions.txnActionSize);
                        pst->pst_transaction_id = GetTopTransactionId();
                        pst->pst_transaction_status = PS_TXN_STS_STARTED;
                }
@@ -492,14 +484,15 @@ PlugStorageCommitTransaction(void)
 
                        Assert(pst->pst_transaction_id == 
GetTopTransactionId());
                        Assert(pst->pst_transaction_status == 
PS_TXN_STS_STARTED);
-                       pst->pst_transaction_command = PS_TXN_CMD_COMMIT;
+                       pst->pst_transaction_command =
+                            PS_TXN_CMD_COMMIT_TRANSACTION;
                        elog(DEBUG1, "PS TXN: COMMIT (%llu, %u, %llu, %u)",
-                            
pst->pst_transaction_dist->currentTransaction.txnId,
-                            
pst->pst_transaction_dist->currentTransaction.txnStatus,
-                            
pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-                            
pst->pst_transaction_dist->txnActions.txnActionSize);
+                            
pst->pst_transaction_snapshot->currentTransaction.txnId,
+                            
pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+                            
pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+                            
pst->pst_transaction_snapshot->txnActions.txnActionSize);
                        InvokePlugStorageFormatTransaction(pst, NULL);
-                       pst->pst_transaction_dist = NULL;
+                       pst->pst_transaction_snapshot = NULL;
                        pst->pst_transaction_id = InvalidTransactionId;
                        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
                }
@@ -525,20 +518,74 @@ PlugStorageAbortTransaction(void)
 
                        Assert(pst->pst_transaction_id == 
GetTopTransactionId());
                        Assert(pst->pst_transaction_status == 
PS_TXN_STS_STARTED);
-                       pst->pst_transaction_command = PS_TXN_CMD_ABORT;
+                       pst->pst_transaction_command =
+                            PS_TXN_CMD_ABORT_TRANSACTION;
                        elog(DEBUG1, "PS TXN: ABORT (%llu, %u, llu, %u)",
-                            
pst->pst_transaction_dist->currentTransaction.txnId,
-                            
pst->pst_transaction_dist->currentTransaction.txnStatus,
-                            
pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-                            
pst->pst_transaction_dist->txnActions.txnActionSize);
+                            
pst->pst_transaction_snapshot->currentTransaction.txnId,
+                            
pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+                            
pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+                            
pst->pst_transaction_snapshot->txnActions.txnActionSize);
                        InvokePlugStorageFormatTransaction(pst, NULL);
-                       pst->pst_transaction_dist = NULL;
+                       pst->pst_transaction_snapshot = NULL;
                        pst->pst_transaction_id = InvalidTransactionId;
                        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
                }
        }
 }
 
+
+MagmaSnapshot *
+PlugStorageGetTransactionSnapshot(List* magmaTableFullNames)
+{
+  PlugStorageTransaction pst = TopPlugStorageTransaction;
+  if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+      (pst->pst_transaction_snapshot == NULL) &&
+      (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+    if (!OidIsValid(pst->pst_proc_oid)) {
+      pst->pst_proc_oid =
+          LookupPlugStorageValidatorFunc("magma", "transaction");
+      Assert(OidIsValid(pst->pst_proc_oid));
+
+      fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+    }
+
+    pst->pst_transaction_command = PS_TXN_CMD_GET_SNAPSHOT;
+    InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+    elog(DEBUG1, "PS TXN: GET SNAPSHOT (%llu, %u, %llu, %u)",
+         pst->pst_transaction_snapshot->currentTransaction.txnId,
+         pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+         pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+         pst->pst_transaction_snapshot->txnActions.txnActionSize);
+    pst->pst_transaction_id = GetTopTransactionId();
+  }
+  return TopPlugStorageTransaction->pst_transaction_snapshot;
+}
+
+void PlugStorageGetTransactionId(List* magmaTableFullNames)
+{
+  PlugStorageTransaction pst = TopPlugStorageTransaction;
+  if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+      pst->pst_transaction_state->currentTransaction.txnId == 0 &&
+      (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+    if (!OidIsValid(pst->pst_proc_oid)) {
+      pst->pst_proc_oid =
+          LookupPlugStorageValidatorFunc("magma", "transaction");
+      Assert(OidIsValid(pst->pst_proc_oid));
+
+      fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+    }
+
+    pst->pst_transaction_command = PS_TXN_CMD_GET_TRANSACTIONID;
+    InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+    elog(DEBUG1, "PS TXN: GET TRANSACTION ID (%llu, %u, %llu, %u)",
+         pst->pst_transaction_snapshot->currentTransaction.txnId,
+         pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+         pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+         pst->pst_transaction_snapshot->txnActions.txnActionSize);
+    pst->pst_transaction_id = GetTopTransactionId();
+  }
+}
+
 /* ----------------------------------------------------------------
  *     transaction state accessors
  * ----------------------------------------------------------------
@@ -2456,7 +2503,7 @@ StartTransaction(void)
        /*
         * begin transaction in magma service now
         */
-       /* PlugStorageBeginTransaction(); */
+       /* PlugStorageStartTransaction(); */
 }
 
 /*
diff --git a/src/backend/catalog/aoseg.c b/src/backend/catalog/aoseg.c
index b7b93fd..7e88b91 100644
--- a/src/backend/catalog/aoseg.c
+++ b/src/backend/catalog/aoseg.c
@@ -56,7 +56,9 @@
 #include "cdb/cdbvars.h"
 
 static bool create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, 
Oid * comptypeOid);
+static bool create_aoseg_index_table(Relation rel, Oid aosegOid, Oid 
aosegIndexOid, Oid * comptypeOid);
 static bool needs_aoseg_table(Relation rel);
+static bool needs_aoseg_index_table(Relation rel);
 
 /*
  * AlterTableCreateAoSegTable
@@ -106,6 +108,19 @@ AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid, 
Oid newIndexOid,
        heap_close(rel, NoLock);
 }
 
+void
+AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child)
+{
+       Relation        rel;
+       Assert(!is_part_child);
+       rel = heap_open(relOid, AccessShareLock);
+
+       /* create_aoseg_index_table does all the work */
+       (void) create_aoseg_index_table(rel, InvalidOid, InvalidOid, NULL);
+
+       heap_close(rel, AccessShareLock);
+}
+
 /*
  * create_aoseg_table --- internal workhorse
  *
@@ -310,6 +325,139 @@ create_aoseg_table(Relation rel, Oid aosegOid, Oid 
aosegIndexOid, Oid * comptype
        return true;
 }
 
+static bool
+create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * 
comptypeOid)
+{
+       Oid                     relOid = RelationGetRelid(rel);
+       TupleDesc       tupdesc;
+       bool            shared_relation;
+       Oid               blkdirrelid;
+       Oid               blkdiridxid;
+       char            aoseg_relname[NAMEDATALEN];
+       char            aoseg_idxname[NAMEDATALEN];
+       IndexInfo  *indexInfo;
+       Oid                     classObjectId[2];
+       ObjectAddress baseobject,
+                               aosegobject;
+       Oid                     tablespaceOid = 
ChooseTablespaceForLimitedObject(rel->rd_rel->reltablespace);
+
+       /*
+        * Check to see whether the table actually needs an aoseg index table.
+        */
+       if (!needs_aoseg_index_table(rel))
+               return false;
+
+       shared_relation = rel->rd_rel->relisshared;
+
+       /* can't have shared AO tables after initdb */
+       /* TODO: disallow it at CREATE TABLE time */
+       Assert(!(shared_relation && !IsBootstrapProcessingMode()) );
+
+       GetAppendOnlyEntryAuxOids(relOid, SnapshotNow, NULL, NULL, 
&blkdirrelid, &blkdiridxid);
+
+       /*
+        * Was a aoseg index table already created?
+        */
+       if (blkdirrelid != InvalidOid)
+       {
+               return false;
+       }
+
+       snprintf(aoseg_relname, sizeof(aoseg_relname), "pg_orcseg_idx_%u",
+                                        relOid);
+       snprintf(aoseg_idxname, sizeof(aoseg_idxname), "pg_orcseg_idx_%u_index",
+                                        relOid);
+
+       tupdesc = CreateTemplateTupleDesc(Natts_pg_orcseg_idx, false);
+
+       TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_idxoid, 
"idxoid",
+                                                                               
 INT4OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_segno, 
"segno",
+                                                                               
 INT4OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_eof, "eof",
+                                                                               
 FLOAT8OID, -1, 0);
+
+       blkdirrelid = heap_create_with_catalog(aoseg_relname,
+                                                                               
   PG_AOSEGMENT_NAMESPACE,
+                                                                               
   tablespaceOid,
+                                                                               
   aosegOid,
+                                                                               
   rel->rd_rel->relowner,
+                                                                               
   tupdesc,
+                                                                               
   /* relam */ InvalidOid,
+                                                                               
   RELKIND_AOSEGMENTS,
+                                                                               
   RELSTORAGE_HEAP,
+                                                                               
   shared_relation,
+                                                                               
   true,
+                                                                               
   /* bufferPoolBulkLoad */ false,
+                                                                               
   0,
+                                                                               
   ONCOMMIT_NOOP,
+                                                                               
   NULL, /* CDB POLICY */
+                                                                               
   (Datum) 0,
+                                                                               
   true,
+                                                                               
   comptypeOid,
+                                                                               
   /* persistentTid */ NULL,
+                                                                               
   /* persistentSerialNum */ NULL,
+                                                                               
   /* formattername */ NULL);
+
+       /* make the toast relation visible, else index creation will fail */
+       CommandCounterIncrement();
+
+       /*
+        * Create unique index on index oid.
+        */
+       indexInfo = makeNode(IndexInfo);
+       indexInfo->ii_NumIndexAttrs = 1;
+       indexInfo->ii_NumIndexKeyAttrs = 1;
+       indexInfo->ii_KeyAttrNumbers[0] = 1;
+       indexInfo->ii_Expressions = NIL;
+       indexInfo->ii_ExpressionsState = NIL;
+       indexInfo->ii_Predicate = NIL;
+       indexInfo->ii_PredicateState = NIL;
+       indexInfo->ii_Unique = true;
+       indexInfo->ii_Concurrent = false;
+
+       classObjectId[0] = INT4_BTREE_OPS_OID;
+       classObjectId[1] = INT4_BTREE_OPS_OID;
+
+       blkdiridxid = index_create(blkdirrelid, aoseg_idxname, aosegIndexOid,
+                                                          indexInfo,
+                                                          BTREE_AM_OID,
+                                                          tablespaceOid,
+                                                          classObjectId, 
(Datum) 0,
+                                                          true, false, (Oid *) 
NULL, true, false, false, NULL);
+
+       /* Unlock target table -- no one can see it */
+       UnlockRelationOid(blkdirrelid, ShareLock);
+       /* Unlock the index -- no one can see it anyway */
+       UnlockRelationOid(blkdiridxid, AccessExclusiveLock);
+
+       /*
+        * Store the aoseg table's OID in the parent relation's pg_appendonly 
row
+        */
+       UpdateAppendOnlyEntryAuxOids(relOid, InvalidOid, InvalidOid, 
blkdirrelid, blkdiridxid);
+
+       /*
+        * Register dependency from the aoseg table to the master, so that the
+        * aoseg table will be deleted if the master is.
+        */
+       baseobject.classId = RelationRelationId;
+       baseobject.objectId = relOid;
+       baseobject.objectSubId = 0;
+       aosegobject.classId = RelationRelationId;
+       aosegobject.objectId = blkdirrelid;
+       aosegobject.objectSubId = 0;
+
+       recordDependencyOn(&aosegobject, &baseobject, DEPENDENCY_INTERNAL);
+
+       /*
+        * Make changes visible
+        */
+       CommandCounterIncrement();
+
+       return true;
+}
+
+
 /*
  * Check to see whether the table needs an aoseg table.        It does only if 
it is
  * an append-only relation.
@@ -320,4 +468,12 @@ needs_aoseg_table(Relation rel)
        return RelationIsAo(rel);
 }
 
-
+/*
+ * Check to see whether the table needs an aoseg index table.  It does only if 
it is
+ * an append-only orc relation.
+ */
+static bool
+needs_aoseg_index_table(Relation rel)
+{
+       return RelationIsOrc(rel);
+}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index e7e3887..e13b97b 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -129,7 +129,7 @@ extern void PQclear(PGresult *res);
 
 static void MetaTrackAddUpdInternal(cqContext  *pcqCtx,
                                                                        Oid     
                classid,
-                                                                       Oid     
                objoid, 
+                                                                       Oid     
                objoid,
                                                                        Oid     
                relowner,
                                                                        char*   
        actionname,
                                                                        char*   
        subtype,
@@ -546,14 +546,14 @@ heap_create(const char *relname,
                                                        
rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
                        heap_close(gp_relation_node, RowExclusiveLock);
                }
-#endif 
+#endif
        }
 
        if (Debug_check_for_invalid_persistent_tid &&
                !Persistent_BeforePersistenceWork() &&
                
PersistentStore_IsZeroTid(&rel->rd_relationnodeinfo.persistentTid))
-       {       
-               elog(ERROR, 
+       {
+               elog(ERROR,
                         "setNewRelfilenodeCommon has invalid TID (0,0) into 
relation %u/%u/%u '%s', serial number " INT64_FORMAT,
                         rel->rd_node.spcNode,
                         rel->rd_node.dbNode,
@@ -563,7 +563,7 @@ heap_create(const char *relname,
        }
 
        if (Debug_persistent_print)
-               elog(Persistent_DebugPrintLevel(), 
+               elog(Persistent_DebugPrintLevel(),
                     "heap_create: '%s', Append-Only '%s', persistent TID %s 
and serial number " INT64_FORMAT " for CREATE",
                         relpath(rel->rd_node),
                         (isAppendOnly ? "true" : "false"),
@@ -917,8 +917,8 @@ static void MetaTrackAddUpdInternal(cqContext  *pcqCtx,
 } /* end MetaTrackAddUpdInternal */
 
 
-void MetaTrackAddObject(Oid            classid, 
-                                               Oid             objoid, 
+void MetaTrackAddObject(Oid            classid,
+                                               Oid             objoid,
                                                Oid             relowner,
                                                char*   actionname,
                                                char*   subtype)
@@ -959,8 +959,8 @@ void MetaTrackAddObject(Oid         classid,
 
 } /* end MetaTrackAddObject */
 
-void MetaTrackUpdObject(Oid            classid, 
-                                               Oid             objoid, 
+void MetaTrackUpdObject(Oid            classid,
+                                               Oid             objoid,
                                                Oid             relowner,
                                                char*   actionname,
                                                char*   subtype)
@@ -1015,7 +1015,7 @@ void MetaTrackUpdObject(Oid               classid,
                                                                        
classid, objoid, relowner,
                                                                        
actionname, subtype,
                                                                        rel, 
tuple);
-                       
+
 /*                     CommandCounterIncrement(); */
 
                        ii++;
@@ -1026,14 +1026,14 @@ void MetaTrackUpdObject(Oid             classid,
 
        /* add it if it didn't already exist */
        if (!ii)
-               MetaTrackAddObject(classid, 
-                                                  objoid, 
+               MetaTrackAddObject(classid,
+                                                  objoid,
                                                   relowner,
                                                   actionname,
                                                   subtype);
 
 } /* end MetaTrackUpdObject */
-void MetaTrackDropObject(Oid           classid, 
+void MetaTrackDropObject(Oid           classid,
                                                 Oid            objoid)
 {
        int ii = 0;
@@ -1307,7 +1307,7 @@ AddNewRelationTuple(Relation pg_class_desc,
                        /* NOTE: look at cdb_estimate_rel_size() if changing 
these values */
                        if(relstorage_is_external(relstorage))
                        {
-                               new_rel_reltup->relpages = 
gp_external_table_default_number_of_pages; 
+                               new_rel_reltup->relpages = 
gp_external_table_default_number_of_pages;
                                new_rel_reltup->reltuples = 
gp_external_table_default_number_of_tuples;
                        }
                        break;
@@ -1430,8 +1430,8 @@ InsertGpRelfileNodeTuple(
        if (Debug_check_for_invalid_persistent_tid &&
                !Persistent_BeforePersistenceWork() &&
                PersistentStore_IsZeroTid(persistentTid))
-       {       
-               elog(ERROR, 
+       {
+               elog(ERROR,
                         "Inserting with invalid TID (0,0) into relation id %u 
'%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
                         relationId,
                         relname,
@@ -1444,7 +1444,7 @@ InsertGpRelfileNodeTuple(
        memset(nulls, false, sizeof(nulls));
 
        if (Debug_persistent_print)
-               elog(Persistent_DebugPrintLevel(), 
+               elog(Persistent_DebugPrintLevel(),
                         "InsertGpRelationNodeTuple: Inserting into relation id 
%u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT ", TID 
%s",
                         relationId,
                         relname,
@@ -1499,8 +1499,8 @@ UpdateGpRelfileNodeTuple(
        if (Debug_check_for_invalid_persistent_tid &&
                !Persistent_BeforePersistenceWork() &&
                PersistentStore_IsZeroTid(persistentTid))
-       {       
-               elog(ERROR, 
+       {
+               elog(ERROR,
                         "Updating with invalid TID (0,0) in relfilenode %u, 
segment file #%d, serial number " INT64_FORMAT,
                         relfilenode,
                         segmentFileNum,
@@ -1508,7 +1508,7 @@ UpdateGpRelfileNodeTuple(
        }
 
        if (Debug_persistent_print)
-               elog(Persistent_DebugPrintLevel(), 
+               elog(Persistent_DebugPrintLevel(),
                         "UpdateGpRelationNodeTuple: Updating relfilenode %u, 
segment file #%d, serial number " INT64_FORMAT " at TID %s",
                         relfilenode,
                         segmentFileNum,
@@ -1521,7 +1521,7 @@ UpdateGpRelfileNodeTuple(
 
        repl_repl[Anum_gp_relfile_node_relfilenode_oid - 1] = true;
        repl_val[Anum_gp_relfile_node_relfilenode_oid - 1] = 
ObjectIdGetDatum(relfilenode);
-       
+
        repl_repl[Anum_gp_relfile_node_segment_file_num - 1] = true;
        repl_val[Anum_gp_relfile_node_segment_file_num - 1] = 
Int32GetDatum(segmentFileNum);
 
@@ -1529,12 +1529,12 @@ UpdateGpRelfileNodeTuple(
 
        repl_repl[Anum_gp_relfile_node_persistent_tid- 1] = true;
        repl_val[Anum_gp_relfile_node_persistent_tid- 1] = 
PointerGetDatum(persistentTid);
-       
+
        repl_repl[Anum_gp_relfile_node_persistent_serial_num - 1] = true;
        repl_val[Anum_gp_relfile_node_persistent_serial_num - 1] = 
Int64GetDatum(persistentSerialNum);
 
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(gp_relfile_node), 
repl_val, repl_null, repl_repl);
-       
+
        simple_heap_update(gp_relfile_node, &newtuple->t_self, newtuple);
 
        CatalogUpdateIndexes(gp_relfile_node, newtuple);
@@ -1559,7 +1559,7 @@ AddNewRelfileNodeTuple(
                                                        /* updateIndex */ true,
                                                        
&new_rel->rd_relationnodeinfo.persistentTid,
                                                        
new_rel->rd_relationnodeinfo.persistentSerialNum);
-                                                       
+
        }
 }
 
@@ -1603,7 +1603,7 @@ heap_create_with_catalog(const char *relname,
 
        pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
 
-    // When creating gp_persistent_relfile_node, we can't directly insert meta 
info into gp_relfile_node 
+    // When creating gp_persistent_relfile_node, we can't directly insert meta 
info into gp_relfile_node
     // for this table is renamed from gp_relation_node, also it's schema 
changed.
        if (IsBootstrapProcessingMode()|| (gp_upgrade_mode && 
GpPersistent_IsPersistentRelation(relid)))
                gp_relfile_node_desc = NULL;
@@ -1630,7 +1630,7 @@ heap_create_with_catalog(const char *relname,
                }
                relstorage = stdRdOptions->columnstore;
        }
-       
+
        if (IsBuiltinTablespace(reltablespace) && appendOnlyRel) {
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -1669,7 +1669,7 @@ heap_create_with_catalog(const char *relname,
                                                                 
stdRdOptions->columnstore);
 
        /* MPP-8058: disallow OIDS on column-oriented tables */
-       if (tupdesc->tdhasoid && 
+       if (tupdesc->tdhasoid &&
                IsNormalProcessingMode() &&
         (Gp_role == GP_ROLE_DISPATCH))
        {
@@ -1982,11 +1982,11 @@ heap_create_with_catalog(const char *relname,
                }
 
                /* MPP-7576: don't track internal namespace tables */
-               switch (relnamespace) 
+               switch (relnamespace)
                {
                        case PG_CATALOG_NAMESPACE:
                                /* MPP-7773: don't track objects in system 
namespace
-                                * if modifying system tables (eg during 
upgrade)  
+                                * if modifying system tables (eg during 
upgrade)
                                 */
                                if (allowSystemTableModsDDL)
                                        doIt = false;
@@ -2330,7 +2330,7 @@ RemoveAttrDefault(Oid relid, AttrNumber attnum,
                object.objectSubId = 0;
 
                performDeletion(&object, behavior);
-               
+
                found = true;
        }
 
@@ -2429,9 +2429,9 @@ remove_gp_relation_node_and_schedule_drop(
        Relation        rel)
 {
        PersistentFileSysRelStorageMgr relStorageMgr;
-       
+
        if (Debug_persistent_print)
-               elog(Persistent_DebugPrintLevel(), 
+               elog(Persistent_DebugPrintLevel(),
                         "remove_gp_relation_node_and_schedule_drop: dropping 
relation '%s', relation id %u '%s', relfilenode %u",
                         rel->rd_rel->relname.data,
                         rel->rd_id,
@@ -2449,9 +2449,9 @@ remove_gp_relation_node_and_schedule_drop(
                DeleteGpRelfileNodeTuple(
                                                                rel,
                                                                /* 
segmentFileNum */ 0);
-               
+
                if (Debug_persistent_print)
-                       elog(Persistent_DebugPrintLevel(), 
+                       elog(Persistent_DebugPrintLevel(),
                                 "remove_gp_relation_node_and_schedule_drop: 
For Buffer Pool managed relation '%s' persistent TID %s and serial number " 
INT64_FORMAT " for DROP",
                                 relpath(rel->rd_node),
                                 
ItemPointerToString(&rel->rd_relationnodeinfo.persistentTid),
@@ -2465,7 +2465,7 @@ remove_gp_relation_node_and_schedule_drop(
                int32 segmentFileNum;
                ItemPointerData persistentTid;
                int64 persistentSerialNum;
-               
+
                relNodeRelation = heap_open(GpRelfileNodeRelationId, 
RowExclusiveLock);
 
                GpRelfileNodeBeginScan(
@@ -2473,7 +2473,7 @@ remove_gp_relation_node_and_schedule_drop(
                                                rel->rd_id,
                                                rel->rd_rel->relfilenode,
                                                &gpRelfileNodeScan);
-               
+
                while ((tuple = GpRelfileNodeGetNext(
                                                                
&gpRelfileNodeScan,
                                                                &segmentFileNum,
@@ -2481,16 +2481,16 @@ remove_gp_relation_node_and_schedule_drop(
                                                                
&persistentSerialNum)))
                {
                        if (Debug_persistent_print)
-                               elog(Persistent_DebugPrintLevel(), 
+                               elog(Persistent_DebugPrintLevel(),
                                         
"remove_gp_relation_node_and_schedule_drop: For Append-Only relation %u 
relfilenode %u scanned segment file #%d, serial number " INT64_FORMAT " at TID 
%s for DROP",
                                         rel->rd_id,
                                         rel->rd_rel->relfilenode,
                                         segmentFileNum,
                                         persistentSerialNum,
                                         ItemPointerToString(&persistentTid));
-                       
+
                        simple_heap_delete(relNodeRelation, &tuple->t_self);
-                       
+
                        MirroredFileSysObj_ScheduleDropAppendOnlyFile(
                                                                                
        &rel->rd_node,
                                                                                
        segmentFileNum,
@@ -2498,9 +2498,9 @@ remove_gp_relation_node_and_schedule_drop(
                                                                                
        &persistentTid,
                                                                                
        persistentSerialNum);
                }
-               
+
                GpRelfileNodeEndScan(&gpRelfileNodeScan);
-               
+
                heap_close(relNodeRelation, RowExclusiveLock);
 
                /*
@@ -2682,8 +2682,9 @@ heap_drop_with_catalog(Oid relid)
                                // start transaction in magma for DROP TABLE
                                if (PlugStorageGetTransactionStatus() == 
PS_TXN_STS_DEFAULT)
                                {
-                                   PlugStorageBeginTransaction(NULL);
+                                  PlugStorageStartTransaction();
                                }
+                                PlugStorageGetTransactionId(NULL);
                                Assert(PlugStorageGetTransactionStatus() == 
PS_TXN_STS_STARTED);
 
                                // drop table in magma now
@@ -2693,7 +2694,7 @@ heap_drop_with_catalog(Oid relid)
                                                     database_name,
                                                     schema_name,
                                                     table_name,
-                                                    
PlugStorageGetTransactionSnapshot());
+                                                    
PlugStorageGetTransactionSnapshot(NULL));
                                
ReadCacheHashEntryReviseOnCommit(RelationGetRelid(rel), true);
 
                        }
@@ -2718,7 +2719,7 @@ heap_drop_with_catalog(Oid relid)
 
        if (is_foreign_rel)
                RemoveForeignTableEntry(relid);
-       
+
        /*
         * delete distribution policy if present
         */
@@ -2815,12 +2816,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char 
*adbin)
 
        adrcqCtx = caql_beginscan(
                        caql_addrel(cqclr(&cqc), adrel),
-                       cql("INSERT INTO pg_attrdef ", 
+                       cql("INSERT INTO pg_attrdef ",
                                NULL));
 
        if (Debug_check_for_invalid_persistent_tid)
-       {       
-               elog(LOG, 
+       {
+               elog(LOG,
                         "StoreAttrDefault[1] relation %u/%u/%u '%s', isPresent 
%s, serial number " INT64_FORMAT ", TID %s",
                         adrel->rd_node.spcNode,
                         adrel->rd_node.dbNode,
@@ -2835,8 +2836,8 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char 
*adbin)
        RelationFetchGpRelationNodeForXLog(adrel);
 
        if (Debug_check_for_invalid_persistent_tid)
-       {       
-               elog(LOG, 
+       {
+               elog(LOG,
                         "StoreAttrDefault[2] relation %u/%u/%u '%s', isPresent 
%s, serial number " INT64_FORMAT ", TID %s",
                         adrel->rd_node.spcNode,
                         adrel->rd_node.dbNode,
@@ -3518,7 +3519,7 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
                                        " WHERE starelid = :1 "
                                        " AND staattnum = :2 ",
                                        ObjectIdGetDatum(relid),
-                                       Int16GetDatum(attnum)));                
 
+                                       Int16GetDatum(attnum)));
        }
 }
 
@@ -3550,7 +3551,7 @@ RelationTruncateIndexes(Relation heapRelation)
 
                /* Now truncate the actual file (and discard buffers) */
                RelationTruncate(
-                                       currentIndex, 
+                                       currentIndex,
                                        0,
                                        /* markPersistentAsPhysicallyTruncated 
*/ true);
 
@@ -3636,7 +3637,7 @@ heap_truncate(List *relids)
 
                /* Truncate the actual file (and discard buffers) */
                RelationTruncate(
-                                       rel, 
+                                       rel,
                                        0,
                                        /* markPersistentAsPhysicallyTruncated 
*/ false);
 
@@ -3888,7 +3889,7 @@ setNewRelfilenodeCommon(Relation relation, Oid 
newrelfilenode)
        newrnode.relNode = newrelfilenode;
 
        isAppendOnly = RelationIsAo(relation);
-       
+
        relname = RelationGetRelationName(relation);
 
        if (!isAppendOnly)
@@ -3897,7 +3898,7 @@ setNewRelfilenodeCommon(Relation relation, Oid 
newrelfilenode)
 
                PersistentFileSysRelStorageMgr localRelStorageMgr;
                PersistentFileSysRelBufpoolKind relBufpoolKind;
-               
+
                GpPersistentRelfileNode_GetRelfileInfo(
                                                                                
        relation->rd_rel->relkind,
                                                                                
        relation->rd_rel->relstorage,
@@ -3905,9 +3906,9 @@ setNewRelfilenodeCommon(Relation relation, Oid 
newrelfilenode)
                                                                                
        &localRelStorageMgr,
                                                                                
        &relBufpoolKind);
                Assert(localRelStorageMgr == 
PersistentFileSysRelStorageMgr_BufferPool);
-               
+
                srel = smgropen(newrnode);
-       
+
                MirroredFileSysObj_TransactionCreateBufferPoolFile(
                                                                                
        srel,
                                                                                
        relBufpoolKind,
@@ -3931,8 +3932,8 @@ setNewRelfilenodeCommon(Relation relation, Oid 
newrelfilenode)
        if (Debug_check_for_invalid_persistent_tid &&
                !Persistent_BeforePersistenceWork() &&
                
PersistentStore_IsZeroTid(&relation->rd_relationnodeinfo.persistentTid))
-       {       
-               elog(ERROR, 
+       {
+               elog(ERROR,
                         "setNewRelfilenodeCommon has invalid TID (0,0) for 
relation %u/%u/%u '%s', serial number " INT64_FORMAT,
                         newrnode.spcNode,
                         newrnode.dbNode,
@@ -3942,9 +3943,9 @@ setNewRelfilenodeCommon(Relation relation, Oid 
newrelfilenode)
        }
 
        relation->rd_relationnodeinfo.isPresent = true;
-       
+
        if (Debug_persistent_print)
-               elog(Persistent_DebugPrintLevel(), 
+               elog(Persistent_DebugPrintLevel(),
                         "setNewRelfilenodeCommon: NEW '%s', Append-Only '%s', 
persistent TID %s and serial number " INT64_FORMAT,
                         relpath(newrnode),
                         (isAppendOnly ? "true" : "false"),
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9b4762e..7747e5f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -977,8 +977,9 @@ index_create(Oid heapRelationId,
                // 2. start transaction in magma for CREATE INDEX
                if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
                {
-                       PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
                }
+                PlugStorageGetTransactionId(NULL);
                Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
                // 3. call InvokeMagmaCreateIndex
@@ -1006,7 +1007,7 @@ index_create(Oid heapRelationId,
                InvokeMagmaCreateIndex(
                                &procInfo, database_name, schemaname,
                                tablename, &idxinfo,
-                               PlugStorageGetTransactionSnapshot());
+                               PlugStorageGetTransactionSnapshot(NULL));
                // free memory
                pfree(idxinfo.indkey);
        }
@@ -1281,8 +1282,9 @@ index_drop(Oid indexId)
                // 2. start transaction in magma for DROP INDEX
                if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
                {
-                       PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
                }
+                PlugStorageGetTransactionId(NULL);
                Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
                // 3. call InvokeMagmaDropIndex
@@ -1302,7 +1304,7 @@ index_drop(Oid indexId)
                InvokeMagmaDropIndex(
                                &procInfo, database_name, schemaname,
                                tablename, indexName,
-                               PlugStorageGetTransactionSnapshot());
+                               PlugStorageGetTransactionSnapshot(NULL));
        }
 
        /*
@@ -1475,7 +1477,7 @@ index_update_stats(Relation rel, bool hasindex, bool 
isprimary,
         * this relpages are only needed by QE,
         * when this is a magma table, just ignore this info.
         */
-       if (!((RelationIsExternal(rel) && RelationIsMagmaTable(rel->rd_id))))
+       if (!RelationIsMagmaTable2(rel->rd_id))
                relpages = RelationGetNumberOfBlocks(rel);
        Oid                     relid = RelationGetRelid(rel);
        Relation        pg_class;
@@ -2846,8 +2848,9 @@ reindex_index(Oid indexId, Oid newrelfilenode, List 
**extra_oids)
                // 2. start transaction in magma for REINDEX INDEX
                if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
                {
-                       PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
                }
+                PlugStorageGetTransactionId(NULL);
                Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
                // 3. call InvokeMagmaReindexIndex
@@ -2867,7 +2870,7 @@ reindex_index(Oid indexId, Oid newrelfilenode, List 
**extra_oids)
                InvokeMagmaReindexIndex(
                                &procInfo, database_name, schemaname,
                                tablename, indexName,
-                               PlugStorageGetTransactionSnapshot());
+                               PlugStorageGetTransactionSnapshot(NULL));
        }
 
        /* Close rels, but keep locks */
diff --git a/src/backend/catalog/pg_compression.c 
b/src/backend/catalog/pg_compression.c
index 1a10d09..4df1427 100644
--- a/src/backend/catalog/pg_compression.c
+++ b/src/backend/catalog/pg_compression.c
@@ -582,7 +582,8 @@ compresstype_is_valid(char *comptype)
        {
                if(strcmp(comptype, "snappy") == 0 ||
                    strcmp(comptype, "gzip") == 0 ||
-                   strcmp(comptype, "lz4") == 0)
+                   strcmp(comptype, "lz4") == 0 ||
+                   strcmp(comptype, "zstd") == 0)
                        found = true;
        }
 
diff --git a/src/backend/cdb/cdbdatalocality.c 
b/src/backend/cdb/cdbdatalocality.c
index 278103b..0e908fc 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -1073,9 +1073,16 @@ int64 
get_block_locations_and_calculate_table_size(split_to_segment_mapping_cont
           // start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
           if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
           {
-                  PlugStorageBeginTransaction(magmaTableFullNames);
+            PlugStorageStartTransaction();
                   useClientCacheDirectly = true;
           }
+          if (((PlannedStmt *)context->srtc_context.base.node)->commandType ==
+                  CMD_SELECT ||
+              context->isTargetNoMagma) {
+            PlugStorageGetTransactionSnapshot(magmaTableFullNames);
+          } else {
+            PlugStorageGetTransactionId(magmaTableFullNames);
+          }
           Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
   }
 
@@ -2886,7 +2893,7 @@ static void ExternalGetMagmaRangeDataLocation(
                        // get range location from magma now
                        Assert(PlugStorageGetTransactionStatus() == 
PS_TXN_STS_STARTED);
                        InvokeMagmaProtocolBlockLocation(ext_entry, procOid, 
dbname, schemaname,
-                                                        tablename, 
PlugStorageGetTransactionSnapshot(),
+                                                        tablename, 
PlugStorageGetTransactionSnapshot(NULL),
                                                         useClientCacheDirectly,
                                                         &bldata);
                }
@@ -6698,14 +6705,15 @@ void build_magma_scansplits_for_result_relations(List 
**alloc_result, List *relO
     // start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
     if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
     {
-      PlugStorageBeginTransaction(NULL);
+      PlugStorageStartTransaction();
     }
+    PlugStorageGetTransactionId(NULL);
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
     ExtProtocolBlockLocationData *bldata = NULL;
     InvokeMagmaProtocolBlockLocation(
         ext_entry, procOid, dbname, schemaname, tablename,
-        PlugStorageGetTransactionSnapshot(), false, &bldata);
+        PlugStorageGetTransactionSnapshot(NULL), false, &bldata);
 
     pfree(dbname);
     pfree(schemaname);
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c 
b/src/backend/cdb/cdbquerycontextdispatching.c
index 9ffb5e5..fcf11d8 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -868,7 +868,7 @@ RebuildPlugStorageSnapshot(QueryContextInfo *cxt)
 
        pfree(snapshot.txnActions.txnActions);
 
-       MagmaSnapshot *s = PlugStorageGetTransactionSnapshot();
+       MagmaSnapshot *s = PlugStorageGetTransactionSnapshot(NULL);
 
        elog(LOG, "SNAPSHOT DEBUG: GET TOP (%llu, %u, %llu, %u, %d)",
             s->currentTransaction.txnId,
@@ -1703,7 +1703,7 @@ prepareDispatchedCatalogSingleRelation(QueryContextInfo 
*cxt, Oid relid,
                        pfree(formatterName);
        }
        /* The pluggable storage snapshot must be dispatched */
-       prepareDispatchedPlugStorageSnapshot(cxt, 
PlugStorageGetTransactionSnapshot());
+       prepareDispatchedPlugStorageSnapshot(cxt, 
PlugStorageGetTransactionSnapshot(NULL));
 
        /* The distribution policy for table */
        prepareDispatchedCatalogDistributionPolicy(cxt, relid);
diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c
index 226ffae..f29514a 100644
--- a/src/backend/cdb/dispatcher.c
+++ b/src/backend/cdb/dispatcher.c
@@ -1180,6 +1180,9 @@ static void dispatcher_serialize_common_plan(DispatchData 
*data, CommonPlanConte
             new_executor_partitioned_hash_recursive_depth_limit);
     univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
                    numberStrBuf);
+    sprintf(numberStrBuf, "%d", 
new_executor_external_sort_memory_limit_size_mb);
+    univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+                   numberStrBuf);
 
     univPlanAddGuc(ctx->univplan, "new_interconnect_type",
                    show_new_interconnect_type());
diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c
index f00afce..ee6cc69 100644
--- a/src/backend/cdb/dispatcher_new.c
+++ b/src/backend/cdb/dispatcher_new.c
@@ -709,6 +709,9 @@ static void 
dispatcher_serialize_common_plan(MainDispatchData *data,
             new_executor_partitioned_hash_recursive_depth_limit);
     univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
                    numberStrBuf);
+    sprintf(numberStrBuf, "%d", 
new_executor_external_sort_memory_limit_size_mb);
+    univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+                   numberStrBuf);
 
     univPlanAddGuc(ctx->univplan, "new_interconnect_type",
                    show_new_interconnect_type());
diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c
index 0c7030c..2f754c3 100644
--- a/src/backend/cdb/motion/ic_udp.c
+++ b/src/backend/cdb/motion/ic_udp.c
@@ -778,8 +778,6 @@ static bool SendChunkUDP(MotionLayerState *mlStates, 
ChunkTransportState *transp
 
 static void doSendStopMessageUDP(ChunkTransportState *transportStates, int16 
motNodeID);
 static bool dispatcherAYT(void);
-static void checkQDConnectionAlive(void);
-
 
 static void *rxThreadFunc(void *arg);
 
@@ -5923,24 +5921,6 @@ formatSockAddr(struct sockaddr *sa, char* buf, int 
bufsize)
 }                                                              /* 
formatSockAddr */
 
 /*
- * checkQDConnectionAlive
- *             Check whether QD connection is still alive. If not, report 
error.
- */
-static void
-checkQDConnectionAlive(void)
-{
-       if (!dispatch_validate_conn(MyProcPort->sock))
-       {
-               if (Gp_role == GP_ROLE_EXECUTE)
-                       ereport(ERROR, 
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-                                                       errmsg("Interconnect 
error segment lost contact with master (recv)")));
-               else
-                       ereport(ERROR, 
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-                                                       errmsg("Interconnect 
error master lost contact with client (recv)")));
-       }
-}
-
-/*
  * getCurrentTime
  *             get current time
  *
@@ -6967,3 +6947,22 @@ WaitInterconnectQuitUDP(void)
        }
        ic_control_info.threadCreated = false;
 }
+
+
+/*
+ * checkQDConnectionAlive
+ *    Check whether QD connection is still alive. If not, report error.
+ */
+void
+checkQDConnectionAlive(void)
+{
+  if (!dispatch_validate_conn(MyProcPort->sock))
+  {
+    if (Gp_role == GP_ROLE_EXECUTE)
+      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+              errmsg("Interconnect error segment lost contact with master 
(recv)")));
+    else
+      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+              errmsg("Interconnect error master lost contact with client 
(recv)")));
+  }
+}
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index b3fcf4a..6b61415 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -3872,7 +3872,7 @@ int64 GetExternalTotalBytesMAGMA(Relation relation){
 
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
     InvokeMagmaProtocolTableSize(ext_entry, procOid, dbname, schemaname, 
tablename,
-                                 PlugStorageGetTransactionSnapshot(), &ts);
+                                 PlugStorageGetTransactionSnapshot(NULL), &ts);
 
     pfree(dbname);
     pfree(schemaname);
@@ -3899,7 +3899,7 @@ int64 GetDatabaseTotalBytesMAGMA(Oid dbOid){
 
                Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
                InvokeMagmaProtocolDatabaseSize(procOid, dbname,
-                                               
PlugStorageGetTransactionSnapshot(),
+                                               
PlugStorageGetTransactionSnapshot(NULL),
                                                &dbs);
 
                pfree(dbname);
@@ -4099,7 +4099,7 @@ uint64 GetExternalTotalBytes(Relation rel)
                        /* start transaction for magma table */
       if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
       {
-        PlugStorageBeginTransaction(NULL);
+        PlugStorageStartTransaction();
       }
                        Assert(PlugStorageGetTransactionStatus() == 
PS_TXN_STS_STARTED);
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 92f7098..ed98b00 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1290,8 +1290,9 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                {
                        if (PlugStorageGetTransactionStatus() == 
PS_TXN_STS_DEFAULT)
                        {
-                               PlugStorageBeginTransaction(NULL);
+                          PlugStorageStartTransaction();
                        }
+                        PlugStorageGetTransactionId(NULL);
                        Assert(PlugStorageGetTransactionStatus() == 
PS_TXN_STS_STARTED);
                }
 
@@ -2574,7 +2575,7 @@ CopyTo(CopyState cstate)
                                        currentScanDesc = 
InvokePlugStorageFormatBeginScan(
                                                        &beginScanFunc, 
cstate->planstmt, node, &(externalstate.ss),
                                                        serializeSchema, 
serializeSchemaLen, rel,
-                                                       formatterType, 
formatterName, PlugStorageGetTransactionSnapshot());
+                                                       formatterType, 
formatterName, PlugStorageGetTransactionSnapshot(NULL));
                                }
                                else
                                {
@@ -4562,7 +4563,7 @@ CopyFrom(CopyState cstate)
                                                                                
                  formatterName,
                                                                                
                  plannedstmt,
                                                                                
                  segfileinfo->segno,
-                                                                               
                  PlugStorageGetTransactionSnapshot());
+                                                                               
                  PlugStorageGetTransactionSnapshot(NULL));
 
                                                        pfree(insertInitFunc);
                                                        pfree(plannedstmt);
diff --git a/src/backend/commands/dbcommands.c 
b/src/backend/commands/dbcommands.c
index 072197a..126861c 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1530,8 +1530,9 @@ dropdb(const char *dbname, bool missing_ok)
                                                // start transaction in magma 
for DROP TABLE
                                                if 
(PlugStorageGetTransactionStatus()
                                                                == 
PS_TXN_STS_DEFAULT) {
-                                                       
PlugStorageBeginTransaction(NULL);
+                                                  
PlugStorageStartTransaction();
                                                }
+                                                
PlugStorageGetTransactionId(NULL);
                                                Assert(
                                                                
PlugStorageGetTransactionStatus()
                                                                                
== PS_TXN_STS_STARTED);
@@ -1539,7 +1540,7 @@ dropdb(const char *dbname, bool missing_ok)
                                                // drop table in magma now
                                                InvokeMagmaDropTable(&procInfo, 
dbInfoRel->exttable, database_name,
                                                                schema_name, 
table_name,
-                                                               
PlugStorageGetTransactionSnapshot());
+                                                               
PlugStorageGetTransactionSnapshot(NULL));
                                                
ReadCacheHashEntryReviseOnCommit(dbInfoRel->relationOid, true);
                                        }
                                }
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 7d43c8f..345de0b 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -34,6 +34,7 @@
 
 #include "postgres.h"
 
+#include "access/aosegfiles.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/fileam.h"
@@ -423,10 +424,11 @@ DefineIndex(Oid relationId,
                                 errmsg("access method \"%s\" does not support 
multicolumn indexes",
                                                accessMethodName)));
 
-    if  (unique && RelationIsAo(rel))
-        ereport(ERROR,
-                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                 errmsg("append-only tables do not support unique indexes")));
+
+       /* native orc can't support unique/primary index */
+       if (unique && RelationIsOrc(rel))
+               ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("native orc do not support unique 
indexes")));
 
        amoptions = accessMethodForm->amoptions;
 
@@ -752,24 +754,14 @@ DefineIndex(Oid relationId,
                                                           
errOmitLocation(true)));
         }
         else
-                   {
-          char *formatOpt = caql_getcstring(
-                  NULL,
-                  cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
-                  ObjectIdGetDatum(relationId)));
-          if (!formatOpt) {
-            ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                    errmsg("Cannot support DefineIndex")));
-          }
-          else {
-            char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
-            if (!formatName ||
-                (!(pg_strncasecmp(formatName, "magma", strlen("magma")) == 
0))) {
-              ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                      errmsg("Cannot support DefineIndex")));
-            }
-          }
-                   }
+        {
+               /* magma and native orc support index */
+               if (!(RelationIsOrc(rel) || RelationIsMagmaTable2(relationId)))
+               {
+                       ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), 
errmsg("Cannot support DefineIndex")));
+               }
+               // dispatch_statement_node((Node *)stmt, NULL, NULL, NULL);
+        }
        }
 
        /* save lockrelid for below, then close rel */
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3460af4..a92eaa6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1833,14 +1833,15 @@ DefineExternalRelation(CreateExternalStmt 
*createExtStmt)
                // start transaction in magma for CREATE TABLE
                if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
                {
-                   PlugStorageBeginTransaction(NULL);
+                   PlugStorageStartTransaction();
                }
+                PlugStorageGetTransactionId(NULL);
                Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
                InvokeMagmaCreateTable(&procInfo,
                                       database_name,
                                       schema_name,
                                       table_name,
-                                      PlugStorageGetTransactionSnapshot(),
+                                      PlugStorageGetTransactionSnapshot(NULL),
                                       createExtStmt->base.tableElts,
                                       createExtStmt->pkey,
                                       createExtStmt->base.distributedBy,
diff --git a/src/backend/executor/execDML.c b/src/backend/executor/execDML.c
index c6a0384..bd06077 100644
--- a/src/backend/executor/execDML.c
+++ b/src/backend/executor/execDML.c
@@ -470,7 +470,7 @@ ExecInsert(TupleTableSlot *slot,
                                                                                
                                         formatterName,
                                                                                
                                         estate->es_plannedstmt,
                                                                                
                                         segfileinfo->segno,
-                                                                               
                                         PlugStorageGetTransactionSnapshot());
+                                                                               
                                         
PlugStorageGetTransactionSnapshot(NULL));
                                }
                                else
                                {
@@ -723,7 +723,7 @@ ldelete:;
                            InvokeMagmaBeginDelete(&procInfo,
                                                   resultRelationDesc,
                                                   estate->es_plannedstmt,
-                                                  
PlugStorageGetTransactionSnapshot());
+                                                  
PlugStorageGetTransactionSnapshot(NULL));
                    }
                else
                {
@@ -1043,7 +1043,7 @@ lreplace:;
                                    InvokeMagmaBeginUpdate(&procInfo,
                                                           resultRelationDesc,
                                                           
estate->es_plannedstmt,
-                                                          
PlugStorageGetTransactionSnapshot());
+                                                          
PlugStorageGetTransactionSnapshot(NULL));
                            elog(LOG, "exec update begin update: %d", 
extUpdDescEntry->ext_ins_oid);
                        }
                        else
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 71f49ef..5de2981 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -5084,7 +5084,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
                                       formatterName,
                                       estate->es_plannedstmt,
                                       segfileinfo->segno,
-                                      PlugStorageGetTransactionSnapshot());
+                                      PlugStorageGetTransactionSnapshot(NULL));
                }
                else
                {
diff --git a/src/backend/executor/nodeExternalscan.c 
b/src/backend/executor/nodeExternalscan.c
index 68d7add..2d203a7 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -355,7 +355,7 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, 
int eflags)
                                        currentRelation,
                                        formatterType,
                                        formatterName,
-                                       PlugStorageGetTransactionSnapshot());
+                                       
PlugStorageGetTransactionSnapshot(NULL));
                }
                else
                {
diff --git a/src/backend/optimizer/path/allpaths.c 
b/src/backend/optimizer/path/allpaths.c
index c635d8f..df5775c 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -386,8 +386,11 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, 
RangeTblEntry *rte)
         pathlist = lappend(pathlist, seqpath);
 
        /* Consider index and bitmap scans */
-       create_index_paths(root, rel, relstorage, 
-                                          &indexpathlist, &bitmappathlist);
+  if (!relstorage_is_ao(relstorage))
+  {
+       /* Temporarily disable index for ao table */
+       create_index_paths(root, rel, relstorage, &indexpathlist, 
&bitmappathlist);
+  }
 
        /* deal with magma index scan */
        if (relstorage == RELSTORAGE_EXTERNAL)
diff --git a/src/backend/optimizer/plan/newPlanner.c 
b/src/backend/optimizer/plan/newPlanner.c
index 4840f47..dc5546e 100644
--- a/src/backend/optimizer/plan/newPlanner.c
+++ b/src/backend/optimizer/plan/newPlanner.c
@@ -17,6 +17,7 @@
  */
 
 #include "optimizer/newPlanner.h"
+#include "catalog/catalog.h"
 
 #include "access/aomd.h"
 #include "access/fileam.h"
@@ -54,6 +55,7 @@ char *new_executor_enable_partitioned_hashjoin_mode;
 char *new_executor_enable_external_sort_mode;
 int new_executor_partitioned_hash_recursive_depth_limit;
 int new_executor_ic_tcp_client_limit_per_query_per_segment;
+int new_executor_external_sort_memory_limit_size_mb;
 
 const char *new_executor_runtime_filter_mode;
 const char *new_executor_runtime_filter_mode_local = "local";
@@ -99,6 +101,8 @@ static void
 do_convert_magma_rangevseg_map_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_rangetbl_to_common_plan(List *rtable,
                                                CommonPlanContext *ctx);
+static void do_convert_result_partitions_to_common_plan(
+    PartitionNode *partitionNode, CommonPlanContext *ctx);
 static void do_convert_token_map_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_splits_list_to_common_plan(List *splits, Oid relOid,
@@ -133,6 +137,19 @@ static bool checkSupportedSubLinkType(SubLinkType 
sublinkType);
 static bool checkInsertSupportTable(PlannedStmt *stmt);
 static bool checkIsPrepareQuery(QueryDesc *queryDesc);
 
+// @return format string whose life time goes along with current MemoryContext
+static const char *buildInternalTableFormatOptionStringInJson(Relation rel) {
+  AppendOnlyEntry *aoentry =
+      GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
+  StringInfoData option;
+  initStringInfo(&option);
+  appendStringInfoChar(&option, '{');
+  if (aoentry->compresstype)
+    appendStringInfo(&option, "%s", aoentry->compresstype);
+  appendStringInfoChar(&option, '}');
+  return option.data;
+}
+
 #define DIRECT_LEFT_CHILD_VAR 0
 #define INT64_MAX_LENGTH 20
 
@@ -313,6 +330,27 @@ void convert_to_common_plan(PlannedStmt *stmt, 
CommonPlanContext *ctx) {
         pfree(rgId);
         pfree(rgUrl);
       }
+      // For append-only internal table
+      if (get_relation_storage_type(oid) == RELSTORAGE_ORC) {
+        ListCell *lc;
+        foreach (lc, stmt->result_segfileinfos) {
+          ResultRelSegFileInfoMapNode *pRelSegFileInfoMapNode =
+              (ResultRelSegFileInfoMapNode *)lfirst(lc);
+          ListCell *lc;
+          foreach (lc, pRelSegFileInfoMapNode->segfileinfos) {
+            ResultRelSegFileInfo *pSegFileInfo = lfirst(lc);
+            if (pSegFileInfo->numfiles == 0) {
+              // detect mixed-up partition of external table
+              ctx->convertible = false;
+              return;
+            }
+            univPlanAddResultRelSegFileInfo(
+                ctx->univplan, pRelSegFileInfoMapNode->relid,
+                pSegFileInfo->segno, pSegFileInfo->eof[0],
+                pSegFileInfo->uncompressed_eof[0]);
+          }
+        }
+      }
       univPlanAddToPlanNode(ctx->univplan, true);
     }
     do_convert_plantree_to_common_plan(stmt->planTree, pid, true, false, NIL,
@@ -327,8 +365,9 @@ void convert_to_common_plan(PlannedStmt *stmt, 
CommonPlanContext *ctx) {
       do_convert_plantree_to_common_plan(subplan, -1, true, true, NIL, NULL,
                                          true, ctx);
   }
-  if (ctx->convertible)
-    do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+  if (ctx->convertible) do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+  if (ctx->convertible && stmt->result_partitions)
+    do_convert_result_partitions_to_common_plan(stmt->result_partitions, ctx);
   if (ctx->convertible && enable_secure_filesystem)
     do_convert_token_map_to_common_plan(ctx);
   if (ctx->convertible && ctx->isMagma)
@@ -1294,9 +1333,11 @@ void do_convert_onetbl_to_common_plan(Oid relid, 
CommonPlanContext *ctx) {
       columnDataTypeMod[i] = att->atttypmod;
     }
     FormatType fmttype = UnivPlanOrcFormat;
-    univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, "dummy", "{}",
-                                  attNum, (const char **)columnName,
-                                  columnDataType, columnDataTypeMod, NULL);
+    univPlanRangeTblEntryAddTable(
+        ctx->univplan, relid, fmttype, relpath(rel->rd_node),
+        buildInternalTableFormatOptionStringInJson(rel), attNum,
+        (const char **)columnName, columnDataType, columnDataTypeMod, NULL,
+        rel->rd_rel->relname.data);
   } else if (RelationIsExternal(rel)) {
     TupleDesc tableAttrs = rel->rd_att;
     attNum = tableAttrs->natts;
@@ -1394,7 +1435,7 @@ void do_convert_onetbl_to_common_plan(Oid relid, 
CommonPlanContext *ctx) {
     univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, location,
                                   fmtOptsJson, attNum,
                                   (const char **)columnName, columnDataType,
-                                  columnDataTypeMod, targetName);
+                                  columnDataTypeMod, targetName, NULL);
 
     if (fmtOptsJson != NULL)
       pfree(fmtOptsJson);
@@ -1419,6 +1460,53 @@ end:
   pfree(columnDataTypeMod);
 }
 
+static void do_convert_result_partition_rule_to_common_plan(
+    CommonPlanContext *ctx, PartitionRule *partitionRule,
+    bool isDefaultPartition) {
+  if (partitionRule->children) {
+    // TODO(chiyang): sub-partition
+    ctx->convertible = false;
+    return;
+  }
+  univPlanResultPartitionsAddPartitionRule(
+      ctx->univplan, partitionRule->parchildrelid, partitionRule->parname,
+      isDefaultPartition);
+
+  ListCell *lc;
+  foreach (lc, partitionRule->parlistvalues) {
+    univPlanPartitionRuleAddPartitionValue(ctx->univplan, isDefaultPartition);
+    List *partitionListValues = (List *)lfirst(lc);
+    ListCell *lc;
+    foreach (lc, partitionListValues) {
+      Const *val = (List *)lfirst(lc);
+      do_convert_expr_to_common_plan(-1, val, ctx);
+      univPlanPartitionValueAddConst(ctx->univplan, isDefaultPartition);
+    }
+  }
+}
+
+static void do_convert_result_partitions_to_common_plan(
+    PartitionNode *partitionNode, CommonPlanContext *ctx) {
+  if (partitionNode->part->parkind != 'l') {
+    // TODO(chiyang): range partition
+    ctx->convertible = false;
+    return;
+  }
+  univPlanAddResultPartitions(ctx->univplan, partitionNode->part->parrelid,
+                              partitionNode->part->parkind,
+                              partitionNode->part->paratts,
+                              partitionNode->part->parnatts);
+  ListCell *lc;
+  foreach (lc, partitionNode->rules) {
+    PartitionRule *partitionRule = (PartitionRule *)lfirst(lc);
+    do_convert_result_partition_rule_to_common_plan(ctx, partitionRule, false);
+  }
+  if (partitionNode->default_part) {
+    do_convert_result_partition_rule_to_common_plan(
+        ctx, partitionNode->default_part, true);
+  }
+}
+
 void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
   HASH_SEQ_STATUS status;
   struct FileSystemCredential *entry;
@@ -1453,14 +1541,14 @@ void 
do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
 // it's convertible and it's a magma scan
 void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx) {
   // start transaction in magma for SELECT in new executor
-  if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
-    PlugStorageBeginTransaction(NULL);
-  }
+  // if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
+  //   PlugStorageStartTransaction(NULL);
+  // }
   Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
   int32_t size = 0;
   char *snapshot = NULL;
-  MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(), 
&snapshot,
-                                 &size);
+  MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(NULL),
+                                 &snapshot, &size);
   if (snapshot && size != 0) {
     univPlanAddSnapshot(ctx->univplan, snapshot, size);
   }
@@ -1959,15 +2047,14 @@ end:
 }
 
 bool checkInsertSupportTable(PlannedStmt *stmt) {
-  // disable partitioned result target
-  if (stmt->result_partitions)
-    return false;
-  if (list_length(stmt->resultRelations) > 1)
-    return false;
+  if (list_length(stmt->resultRelations) > 1) return false;
   int32_t index = list_nth_int(stmt->resultRelations, 0);
   RangeTblEntry *rte = (RangeTblEntry *)list_nth(stmt->rtable, index - 1);
 
-  // if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+  if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+
+  // disable partition table insert for external table
+  if (stmt->result_partitions) return false;
 
   Relation pgExtTableRel = heap_open(ExtTableRelationId, RowExclusiveLock);
   cqContext cqc;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index be53f20..ce75706 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -8112,6 +8112,11 @@ static Query *transformIndexStmt(ParseState *pstate, 
IndexStmt *stmt,
 
     if (RelationBuildPartitionDesc(rel, false)) stmt->do_part = true;
 
+    /* native orc can't create index in parent relation */
+    if (RelationIsOrc(rel) && stmt->do_part)
+               ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+                               errmsg("Cannot support create index statement 
in native orc parent relation yet")));
+
     if (stmt->do_part && Gp_role != GP_ROLE_EXECUTE) {
       List *children;
       struct HTAB *nameCache;
diff --git a/src/backend/storage/buffer/bufmgr.c 
b/src/backend/storage/buffer/bufmgr.c
index ed4b5dc..ee8958d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -67,6 +67,7 @@
 #include "cdb/cdbpersistentrelfile.h"
 
 #include "access/aosegfiles.h"
+#include "access/orcsegfiles.h"
 #include "access/parquetsegfiles.h"
 #include "cdb/cdbappendonlyam.h"
 #include "cdb/cdbvars.h"
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 27e7894..89d20e0 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -17,6 +17,7 @@
 #include "postgres.h"
 #include "port.h"
 
+#include "access/aosegfiles.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/fileam.h"
@@ -415,6 +416,72 @@ QueryIsReadOnly(Query *parsetree)
 }
 
 /*
+ * CanCreateIndex: can support create index
+ * So far, magma table and native orc could support index
+ */
+void CanSupportIndex(IndexStmt *stmt, Oid relid)
+{
+       /* 1. upgrade mode should support index operation */
+       if (gp_upgrade_mode) return;
+
+       bool supportIndex = false;
+       Relation rel = heap_open(relid, AccessShareLock);
+       bool nativeOrc = RelationIsOrc(rel);
+       heap_close(rel, AccessShareLock);
+
+       /*
+        * 2. deal magma table and native orc
+        * for "stmt->magma", deal with special partition situation, oushu 
issue #1049
+        * its ugly, but there is no elegant way now
+        */
+       if (RelationIsMagmaTable2(relid) || stmt->magma || nativeOrc)
+       {
+                       supportIndex = true;
+                       if (nativeOrc)
+                       {
+                               /* add pg_aoseg.pg_orcseg_idx_xxx and its index 
pg_aoseg.pg_orcseg_idx_xxx_index */
+                               AlterTableCreateAoSegIndexTableWithOid(relid, 
stmt->is_part_child);
+                       }
+       }
+       if (supportIndex)
+       {
+               /*
+                * 3. magma/native orc index cant support the accessory 
conditions
+                */
+               if (stmt->options) {
+                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+                                                                       
errmsg("magma/native orc Index cannot support create index with clause")));
+               }
+               if (stmt->whereClause) {
+                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+                                                                       
errmsg("magma/native orc Index cannot support create index where predicate")));
+               }
+               if (stmt->tableSpace) {
+                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+                                                                       
errmsg("magma/native orc Index cannot support create index with tableSpace")));
+               }
+               ListCell   *cell;
+               foreach(cell, stmt->indexParams)
+               {
+                       IndexElem *elem = (IndexElem *) lfirst(cell);
+                       if (elem->expr || elem->opclass)
+                       {
+                               ereport(ERROR,
+                                                               
(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+                                                                               
errmsg("magma/native orc Index cannot support create index with expr or 
opclass")));
+                       }
+               }
+       }
+       else
+       {
+               ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), 
errmsg("Cannot support create index statement yet")));
+       }
+}
+
+/*
  * CommandIsReadOnly: is an executable query read-only?
  *
  * This is a much stricter test than we apply for XactReadOnly mode;
@@ -1349,83 +1416,13 @@ ProcessUtility(Node *parsetree,
                                lockmode = stmt->concurrent ? 
ShareUpdateExclusiveLock
                                                : ShareLock;
                                relid = RangeVarGetRelid(stmt->relation, false, 
false/*allowHcatalog*/);
-
-                               /* Only create index for external table with 
magma */
                                Assert(OidIsValid(relid));
-                               char *formatOpt = caql_getcstring(
-                                       NULL,
-                                       cql("SELECT fmtopts FROM pg_exttable 
WHERE reloid = :1",
-                                       ObjectIdGetDatum(relid)));
-
-                               if (!formatOpt)
-                               {
-                                 if (stmt->magma) {}
-                                 else if (!gp_upgrade_mode)
-                                       {
-                                               ereport(ERROR,
-                                               
(errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index 
statement yet") ));
-                                       }
-                               }
-                               else
-                               {
-                                       char *formatName = 
getExtTblFormatterTypeInFmtOptsStr(formatOpt);
-                                       if (!formatName)
-                                       {
-                                               if (!gp_upgrade_mode)
-                                               {
-                                                       ereport(ERROR,
-                                                                               
        (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create 
index statement yet") ));
-                                               }
-                                       }
-                                       /* in order to support magmatp/magmaap 
*/
-                                       else if ((pg_strncasecmp(formatName, 
FORMAT_MAGMA_TP_STR,
-                                                                               
                                                         
sizeof(FORMAT_MAGMA_TP_STR)-1) == 0) ||
-                                                       
(pg_strncasecmp(formatName, FORMAT_MAGMA_AP_STR,
-                                                                               
                                        sizeof(FORMAT_MAGMA_AP_STR)-1) == 0))
-                                       {
-                                               if (stmt->options) {
-                                                       ereport(ERROR,
-                                                                               
        (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                                                                               
                        errmsg("magma Index cannot support create index with 
clause")));
-                                               }
-                                               if (stmt->whereClause) {
-                                                       ereport(ERROR,
-                                                                               
        (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                                                                               
                        errmsg("magma Index cannot support create index where 
predicate")));
-                                               }
-                                               if (stmt->tableSpace) {
-                                                       ereport(ERROR,
-                                                                               
        (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                                                                               
                        errmsg("magma Index cannot support create index with 
tableSpace")));
-                                               }
-                                               ListCell   *cell;
-                                               foreach(cell, stmt->indexParams)
-                                               {
-                                                       IndexElem *elem = 
(IndexElem *) lfirst(cell);
-                                                       if (elem->expr || 
elem->opclass)
-                                                       {
-                                                               ereport(ERROR,
-                                                                               
                (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                                                                               
                                errmsg("magma Index cannot support create index 
with expr or opclass")));
-                                                       }
-                                               }
-                                               pfree(formatName);
-                                               // break;
-                                       }
-                                       else
-                                       {
-                                               pfree(formatName);
-                                               if (!gp_upgrade_mode)
-                                               {
-                                                       ereport(ERROR,
-                                                               
(errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index 
statement yet") ));
-                                               }
-                                       }
-                               }
-
                                LockRelationOid(relid, lockmode);
                                CheckRelationOwnership(relid, true);
 
+                               // check whether can support index
+                               CanSupportIndex(stmt, relid);
+
                                DefineIndex(relid,              /* relation */
                                                        stmt->idxname,          
/* index name */
                                                        InvalidOid, /* no 
predefined OID */
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index de91837..a5e327e 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -222,7 +222,7 @@ calculate_database_size(Oid dbOid)
     /* start transaction for magma table */
     if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
     {
-      PlugStorageBeginTransaction(NULL);
+      PlugStorageStartTransaction();
     }
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
diff --git a/src/backend/utils/gp/segadmin.c b/src/backend/utils/gp/segadmin.c
index ab4bff0..a615242 100644
--- a/src/backend/utils/gp/segadmin.c
+++ b/src/backend/utils/gp/segadmin.c
@@ -309,13 +309,16 @@ Datum hawq_magma_status(PG_FUNCTION_ARGS)
          }
          tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
          // free memory
-         free(data->magmaNodes[funcctx->call_cntr].node);
-         free(data->magmaNodes[funcctx->call_cntr].dirs);
+         if (data->magmaNodes[funcctx->call_cntr].node)
+           free(data->magmaNodes[funcctx->call_cntr].node);
+         if (data->magmaNodes[funcctx->call_cntr].dirs)
+           free(data->magmaNodes[funcctx->call_cntr].dirs);
          result = HeapTupleGetDatum(tuple);
          SRF_RETURN_NEXT(funcctx, result);
        } else {
          // free memory
-         free(data->magmaNodes);
+         if (data->magmaNodes)
+           free(data->magmaNodes);
          SRF_RETURN_DONE(funcctx);
        }
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4bb606b..20f8f1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4678,6 +4678,15 @@ static struct config_int ConfigureNamesInt[] =
                10000, 0, 65535, NULL, NULL
        },
 
+  {
+    {"new_executor_external_sort_memory_limit", PGC_USERSET, 
QUERY_TUNING_OTHER,
+      gettext_noop("Sets the memory usage (in MB) limit of external sort for 
new executor."),
+      NULL
+    },
+    &new_executor_external_sort_memory_limit_size_mb,
+    256, 0, 1024, NULL, NULL
+  },
+
        {
                {"default_magma_hash_table_nvseg_per_seg", PGC_USERSET, 
QUERY_TUNING_OTHER,
                        gettext_noop("Sets default vseg number per node for 
Magma hash table"),
@@ -7490,7 +7499,6 @@ static struct config_string ConfigureNamesString[] =
                "AUTO", assign_new_executor_mode, NULL
        },
 
-
        {
                {"new_scheduler", PGC_USERSET, EXTERNAL_TABLES,
                        gettext_noop("Enable the new scheduler."),
diff --git a/src/include/access/orcsegfiles.h b/src/include/access/orcsegfiles.h
index 4a87981..f8b4582 100644
--- a/src/include/access/orcsegfiles.h
+++ b/src/include/access/orcsegfiles.h
@@ -30,6 +30,11 @@
 #define Anum_pg_orcseg_tupcount 3
 #define Anum_pg_orcseg_eofuncompressed 4
 
+#define Natts_pg_orcseg_idx 3
+#define Anum_pg_orcseg_idx_idxoid 1
+#define Anum_pg_orcseg_idx_segno 2
+#define Anum_pg_orcseg_idx_eof 3
+
 extern void insertInitialOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo);
 extern void insertOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo,
                                 float8 tupleCount, float8 eof,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 4b825ac..93e922d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -188,10 +188,15 @@ typedef enum PlugStorageTransactionStatus
 
 typedef enum PlugStorageTransactionCommand
 {
-       PS_TXN_CMD_BEGIN   = 0,
-       PS_TXN_CMD_COMMIT  = 1,
-       PS_TXN_CMD_ABORT   = 2,
-       PS_TXN_CMD_INVALID = 3
+    PS_TXN_CMD_START_TRANSACTION        = 0,
+    PS_TXN_CMD_COMMIT_TRANSACTION       = 1,
+    PS_TXN_CMD_ABORT_TRANSACTION        = 2,
+    PS_TXN_CMD_START_SUB_TRANSACTION    = 3,
+    PS_TXN_CMD_COMMIT_SUB_TRANSACTION   = 4,
+    PS_TXN_CMD_ABORT_SUB_TRANSACTION    = 5,
+    PS_TXN_CMD_GET_SNAPSHOT             = 6,
+    PS_TXN_CMD_GET_TRANSACTIONID        = 7,
+    PS_TXN_CMD_INVALID                  = 8
 } PlugStorageTransactionCommand;
 
 typedef struct PlugStorageTransactionData
@@ -202,7 +207,8 @@ typedef struct PlugStorageTransactionData
        TransactionId                  pst_transaction_id;
        PlugStorageTransactionStatus   pst_transaction_status;
        PlugStorageTransactionCommand  pst_transaction_command;
-       MagmaSnapshot                 *pst_transaction_dist;     /* magma 
format */
+       MagmaSnapshot                 *pst_transaction_snapshot;     /* magma 
format */
+       MagmaTransactionState         *pst_transaction_state;        /* magma 
format */
 } PlugStorageTransactionData;
 
 typedef PlugStorageTransactionData *PlugStorageTransaction;
@@ -217,9 +223,10 @@ extern bool isCleanupAbortTransaction;
  */
 extern PlugStorageTransaction PlugStorageGetTransaction(void);
 extern PlugStorageTransactionStatus PlugStorageGetTransactionStatus(void);
-extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(void);
 extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot);
-extern void PlugStorageBeginTransaction(List* magmaTableFullNames);
+extern void PlugStorageStartTransaction();
+extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(List* 
magmaTableFullNames);
+extern void PlugStorageGetTransactionId(List* magmaTableFullNames);
 extern void PlugStorageCommitTransaction(void);
 extern void PlugStorageAbortTransaction(void);
 extern void PlugStorageSetIsCleanupAbort(bool isCleanup);
diff --git a/src/include/catalog/aoseg.h b/src/include/catalog/aoseg.h
index 0e3d5ff..e653fb7 100644
--- a/src/include/catalog/aoseg.h
+++ b/src/include/catalog/aoseg.h
@@ -34,6 +34,7 @@ extern void AlterTableCreateAoSegTableWithOid(Oid relOid, Oid 
newOid,
                                                                                
          Oid newIndexOid,
                                                                                
          Oid *comptypeOid,
                                                                                
          bool is_part_child);
+extern void AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool 
is_part_child);
 
 extern void gpsql_appendonly_segfile_create(PG_FUNCTION_ARGS);
 
diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h
index 50b6c0d..8284945 100644
--- a/src/include/cdb/ml_ipc.h
+++ b/src/include/cdb/ml_ipc.h
@@ -354,4 +354,6 @@ extern void CleanUpNewInterconnect();
 
 extern void ResetRpcClientInstance();
 
+extern void checkQDConnectionAlive(void);
+
 #endif   /* ML_IPC_H */
diff --git a/src/include/cwrapper/magma/cwrapper/magma-client-c.h 
b/src/include/cwrapper/magma/cwrapper/magma-client-c.h
index bef1a13..9eed545 100644
--- a/src/include/cwrapper/magma/cwrapper/magma-client-c.h
+++ b/src/include/cwrapper/magma/cwrapper/magma-client-c.h
@@ -127,6 +127,34 @@ __attribute__((weak)) typedef struct MagmaColumn {
   int32_t id;
 } MagmaColumn;
 
+__attribute__((weak)) typedef int BackendId;
+
+__attribute__((weak)) typedef uint32_t LocalTransactionId;
+
+__attribute__((weak)) typedef struct VirtualTransactionId {
+  BackendId backendId;
+  LocalTransactionId localTransactionId;
+} VirtualTransactionId;
+
+__attribute__((weak)) typedef uint64_t MagmaTransactionId;
+
+__attribute__((weak)) typedef uint8_t TransactionStatus;
+
+__attribute__((weak)) struct MagmaRgIds;
+
+__attribute__((weak)) typedef struct MagmaRgIds MagmaRgIds;
+
+__attribute__((weak)) typedef struct MagmaTransactionState {
+  VirtualTransactionId
+      virtualTransactionId;  // used for 'magma lock', generated on magma
+  // client startTransaction
+  MagmaTransactionId transactionId;  // useless for read only transaction
+  uint32_t commandId;                // useless for read only transaction
+  TransactionStatus state;
+  MagmaRgIds *relatedRgIds;
+  MagmaTxnAction currentTransaction;
+} MagmaTransactionState;
+
 __attribute__((weak)) typedef struct MagmaReplicaGroup {
   uint32_t id;
   uint16_t port;
@@ -144,8 +172,6 @@ __attribute__((weak)) typedef void *MagmaTablePtr;
 __attribute__((weak)) typedef void *MagmaRangeDistPtr;
 __attribute__((weak)) typedef void *MagmaRangePtr;
 
-__attribute__((weak)) typedef struct MagmaTransactionState 
MagmaTransactionState;
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/include/cwrapper/univplan/cwrapper/univplan-c.h 
b/src/include/cwrapper/univplan/cwrapper/univplan-c.h
index 34f4c88..a52c829 100644
--- a/src/include/cwrapper/univplan/cwrapper/univplan-c.h
+++ b/src/include/cwrapper/univplan/cwrapper/univplan-c.h
@@ -72,7 +72,8 @@ __attribute__((weak)) void 
univPlanRangeTblEntryAddTable(UnivPlanC *up, uint64_t
                                    const char **columnName,
                                    int32_t *columnDataType,
                                    int64_t *columnDataTypeMod,
-                                   const char *targetName) {}
+                                   const char *targetName,
+                                   const char *tableName) {}
 __attribute__((weak)) void univPlanRangeTblEntryAddDummy(UnivPlanC *up) {}
 
 // construct interconnect info
diff --git a/src/include/optimizer/newPlanner.h 
b/src/include/optimizer/newPlanner.h
index 97bcdf6..6067169 100644
--- a/src/include/optimizer/newPlanner.h
+++ b/src/include/optimizer/newPlanner.h
@@ -41,6 +41,7 @@ extern char *new_executor_enable_partitioned_hashjoin_mode;
 extern char *new_executor_enable_external_sort_mode;
 extern int new_executor_partitioned_hash_recursive_depth_limit;
 extern int new_executor_ic_tcp_client_limit_per_query_per_segment;
+extern int new_executor_external_sort_memory_limit_size_mb;
 
 extern const char *new_executor_runtime_filter_mode;
 extern const char *new_executor_runtime_filter_mode_local;
diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h
index 99eec7c..776cf35 100644
--- a/src/include/tcop/utility.h
+++ b/src/include/tcop/utility.h
@@ -43,4 +43,6 @@ extern void CheckRelationOwnership(Oid relOid, bool 
noCatalogs);
 
 extern void DropErrorMsgNonExistent(const RangeVar *rel, char rightkind, bool 
missing_ok);
 
+extern void CanSupportIndex(IndexStmt *stmt, Oid relid);
+
 #endif   /* UTILITY_H */

Reply via email to