This is an automated email from the ASF dual-hosted git repository.
chenjinbao1989 pushed a commit to branch cbdb-postgres-merge
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/cbdb-postgres-merge by this
push:
new 58168417285 Fix conflicts for replication
58168417285 is described below
commit 58168417285caa2dc48bdbd85b0f366a52ac32e2
Author: Jinbao Chen <[email protected]>
AuthorDate: Sat Jul 26 14:42:26 2025 +0800
Fix conflicts for replication
---
.../libpqwalreceiver/libpqwalreceiver.c | 4 -
src/backend/replication/logical/decode.c | 9 +-
src/backend/replication/logical/launcher.c | 6 -
src/backend/replication/logical/origin.c | 11 +-
src/backend/replication/logical/relation.c | 37 -----
src/backend/replication/logical/reorderbuffer.c | 12 --
src/backend/replication/logical/tablesync.c | 11 +-
src/backend/replication/logical/worker.c | 163 +--------------------
src/backend/replication/pgoutput/pgoutput.c | 100 -------------
src/backend/replication/repl_gram.y | 18 ---
src/backend/replication/repl_scanner.l | 10 --
src/backend/replication/slot.c | 28 ----
src/backend/replication/slotfuncs.c | 3 -
src/backend/replication/syncrep.c | 72 +--------
src/backend/replication/walreceiver.c | 39 -----
src/backend/replication/walsender.c | 34 +----
src/include/storage/buffile.h | 2 +-
17 files changed, 15 insertions(+), 544 deletions(-)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index ec9d352c440..560e432be2c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -36,7 +36,6 @@
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
-<<<<<<< HEAD
/*
* In PostgreSQL, this is a dynamically loaded module, because PostgreSQL
* doesn't want to link libpq statically into the backend. In GPDB, we have
@@ -44,9 +43,6 @@
* compiled and linked directly as part of the postgres binary, like any
* other backend .c file.
*/
-=======
-PG_MODULE_MAGIC;
->>>>>>> REL_16_9
struct WalReceiverConn
{
diff --git a/src/backend/replication/logical/decode.c
b/src/backend/replication/logical/decode.c
index 227911d70e8..0004786e359 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -92,11 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *recor
{
XLogRecordBuffer buf;
TransactionId txid;
-<<<<<<< HEAD
- RmgrData rmgr;
-=======
RmgrData rmgr;
->>>>>>> REL_16_9
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
@@ -194,14 +190,11 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer
*buf)
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
case XLOG_FPI:
-<<<<<<< HEAD
+ case XLOG_OVERWRITE_CONTRECORD:
/* GPDB_14_MERGE_FIXME: see pg_control.h, Compatible, Figure
out whether 0xC0 already used? */
case XLOG_NEXTRELFILENODE:
case XLOG_OVERWRITE_CONTRECORD:
case XLOG_ENCRYPTION_LSN:
-=======
- case XLOG_OVERWRITE_CONTRECORD:
->>>>>>> REL_16_9
break;
default:
elog(ERROR, "unexpected RM_XLOG_ID record type: %u",
info);
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 6c43c28a731..8395ae7b23c 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -389,15 +389,9 @@ retry:
}
/*
-<<<<<<< HEAD
- * We don't allow to invoke more sync workers once we have reached the
sync
- * worker limit per subscription. So, just return silently as we might
get
- * here because of an otherwise harmless race condition.
-=======
* We don't allow to invoke more sync workers once we have reached the
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
->>>>>>> REL_16_9
*/
if (OidIsValid(relid) && nsyncworkers >=
max_sync_workers_per_subscription)
{
diff --git a/src/backend/replication/logical/origin.c
b/src/backend/replication/logical/origin.c
index c6c445a8ca1..7d1d88a4d92 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1272,26 +1272,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
-<<<<<<< HEAD
- /* Replication origins "pg_xxx|gp_xxx" are reserved for internal use */
- if (IsReservedName(name) || IsReservedGpName(name))
-=======
/*
* Replication origins "any and "none" are reserved for system options.
* The origins "pg_xxx" are reserved for internal use.
*/
- if (IsReservedName(name) || IsReservedOriginName(name))
->>>>>>> REL_16_9
+ if (IsReservedName(name) || IsReservedOriginName(name) ||
IsReservedGpName(name))
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("replication origin name \"%s\" is
reserved",
name),
-<<<<<<< HEAD
- errdetail("Origin names starting with \"%s\"
are reserved.", GetReservedPrefix(name))));
-=======
errdetail("Origin names \"%s\", \"%s\", and
names starting with \"pg_\" are reserved.",
LOGICALREP_ORIGIN_ANY,
LOGICALREP_ORIGIN_NONE)));
->>>>>>> REL_16_9
/*
* If built with appropriate switch, whine when regression-testing
diff --git a/src/backend/replication/logical/relation.c
b/src/backend/replication/logical/relation.c
index 39156d6afc3..2e0cad1d8a2 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,10 +17,7 @@
#include "postgres.h"
-<<<<<<< HEAD
-=======
#include "access/genam.h"
->>>>>>> REL_16_9
#include "access/table.h"
#include "catalog/namespace.h"
#include "catalog/pg_am_d.h"
@@ -385,11 +382,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE
lockmode)
/* Release the no-longer-useful attrmap, if any. */
if (entry->attrmap)
{
-<<<<<<< HEAD
pfree(entry->attrmap);
-=======
- free_attrmap(entry->attrmap);
->>>>>>> REL_16_9
entry->attrmap = NULL;
}
@@ -528,8 +521,6 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
while ((entry = (LogicalRepPartMapEntry *)
hash_seq_search(&status)) != NULL)
entry->relmapentry.localrelvalid = false;
-<<<<<<< HEAD
-=======
}
}
@@ -564,7 +555,6 @@ logicalrep_partmap_reset_relmap(LogicalRepRelation
*remoterel)
logicalrep_relmap_free_entry(entry);
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
->>>>>>> REL_16_9
}
}
@@ -628,25 +618,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
entry = &part_entry->relmapentry;
-<<<<<<< HEAD
if (found && entry->localrelvalid)
return entry;
-=======
- /*
- * We must always overwrite entry->localrel with the latest partition
- * Relation pointer, because the Relation pointed to by the old value
may
- * have been cleared after the caller would have closed the partition
- * relation after the last use of this entry. Note that localrelvalid
is
- * only updated by the relcache invalidation callback, so it may still
be
- * true irrespective of whether the Relation pointed to by localrel has
- * been cleared or not.
- */
- if (found && entry->localrelvalid)
- {
- entry->localrel = partrel;
- return entry;
- }
->>>>>>> REL_16_9
/* Switch to longer-lived context. */
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
@@ -657,16 +630,6 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
part_entry->partoid = partOid;
}
-<<<<<<< HEAD
-=======
- /* Release the no-longer-useful attrmap, if any. */
- if (entry->attrmap)
- {
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
- }
-
->>>>>>> REL_16_9
if (!entry->remoterel.remoteid)
{
int i;
diff --git a/src/backend/replication/logical/reorderbuffer.c
b/src/backend/replication/logical/reorderbuffer.c
index 45134cd202d..fa04e829cc9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -655,13 +655,8 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId
xid, bool create,
}
/*
-<<<<<<< HEAD
- * If the cache wasn't hit or it yielded a "does-not-exist" and we want
- * to create an entry.
-=======
* If the cache wasn't hit or it yielded a "does-not-exist" and we want
to
* create an entry.
->>>>>>> REL_16_9
*/
/* search the lookup table */
@@ -3208,14 +3203,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
* Update the total size in top level as well. This is later used to
* compute the decoding stats.
*/
-<<<<<<< HEAD
- if (txn->toptxn != NULL)
- toptxn = txn->toptxn;
- else
- toptxn = txn;
-=======
toptxn = rbtxn_get_toptxn(txn);
->>>>>>> REL_16_9
if (addition)
{
diff --git a/src/backend/replication/logical/tablesync.c
b/src/backend/replication/logical/tablesync.c
index 131fca4f1cd..013e1235132 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -784,14 +784,9 @@ fetch_remote_table_info(char *nspname, char *relname,
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
-<<<<<<< HEAD
- Oid tableRow[3] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID,
BOOLOID};
-=======
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
->>>>>>> REL_16_9
bool isnull;
char relkind;
int natt;
@@ -1242,11 +1237,7 @@ copy_table(Relation rel)
NULL, false, false);
attnamelist = make_copy_attnamelist(relmapentry);
-<<<<<<< HEAD
- cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data,
NULL, attnamelist, NIL);
-=======
- cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data,
attnamelist, options);
->>>>>>> REL_16_9
+ cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data,
NULL, attnamelist, options);
/* Do the copy */
(void) CopyFrom(cstate);
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index f132ac56fda..564177a5e40 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -221,15 +221,6 @@ typedef struct FlushPosition
static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
-<<<<<<< HEAD
-typedef struct SlotErrCallbackArg
-{
- LogicalRepRelMapEntry *rel;
- int remote_attnum;
-} SlotErrCallbackArg;
-
-=======
->>>>>>> REL_16_9
typedef struct ApplyExecutionData
{
EState *estate; /* executor state, used to
track resources */
@@ -407,14 +398,6 @@ static void send_feedback(XLogRecPtr recvpos, bool force,
bool requestReply);
static void DisableSubscriptionAndExit(void);
-<<<<<<< HEAD
-static void maybe_reread_subscription(void);
-
-/* prototype needed because of stream_commit */
-static void apply_dispatch(StringInfo s);
-
-=======
->>>>>>> REL_16_9
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -819,29 +802,6 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState
*estate,
}
/*
-<<<<<<< HEAD
- * Error callback to give more context info about data conversion failures
- * while reading data from the remote server.
- */
-static void
-slot_store_error_callback(void *arg)
-{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
- LogicalRepRelMapEntry *rel;
-
- /* Nothing to do if remote attribute number is not set */
- if (errarg->remote_attnum < 0)
- return;
-
- rel = errarg->rel;
- errcontext("processing remote data for replication target relation
\"%s.%s\" column \"%s\"",
- rel->remoterel.nspname, rel->remoterel.relname,
- rel->remoterel.attnames[errarg->remote_attnum]);
-}
-
-/*
-=======
->>>>>>> REL_16_9
* Store tuple data into slot.
*
* Incoming data can be either text or binary format.
@@ -855,17 +815,6 @@ slot_store_data(TupleTableSlot *slot,
LogicalRepRelMapEntry *rel,
ExecClearTuple(slot);
-<<<<<<< HEAD
- /* Push callback + info on the error context stack */
- errarg.rel = rel;
- errarg.remote_attnum = -1;
- errcallback.callback = slot_store_error_callback;
- errcallback.arg = (void *) &errarg;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
-=======
->>>>>>> REL_16_9
/* Call the "in" function for each non-dropped, non-null attribute */
Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++)
@@ -879,12 +828,8 @@ slot_store_data(TupleTableSlot *slot,
LogicalRepRelMapEntry *rel,
Assert(remoteattnum < tupleData->ncols);
-<<<<<<< HEAD
- errarg.remote_attnum = remoteattnum;
-=======
/* Set attnum for error callback */
apply_error_callback_arg.remote_attnum = remoteattnum;
->>>>>>> REL_16_9
if (tupleData->colstatus[remoteattnum] ==
LOGICALREP_COLUMN_TEXT)
{
@@ -932,12 +877,8 @@ slot_store_data(TupleTableSlot *slot,
LogicalRepRelMapEntry *rel,
slot->tts_isnull[i] = true;
}
-<<<<<<< HEAD
- errarg.remote_attnum = -1;
-=======
/* Reset attnum for error callback */
apply_error_callback_arg.remote_attnum = -1;
->>>>>>> REL_16_9
}
else
{
@@ -986,7 +927,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot
*srcslot,
memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
-<<<<<<< HEAD
/* For error reporting, push callback + info on the error context stack
*/
errarg.rel = rel;
errarg.remote_attnum = -1;
@@ -995,8 +935,6 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot
*srcslot,
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
-=======
->>>>>>> REL_16_9
/* Call the "in" function for each replaced attribute */
Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++)
@@ -1013,12 +951,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot
*srcslot,
{
StringInfo colvalue =
&tupleData->colvalues[remoteattnum];
-<<<<<<< HEAD
- errarg.remote_attnum = remoteattnum;
-=======
/* Set attnum for error callback */
apply_error_callback_arg.remote_attnum = remoteattnum;
->>>>>>> REL_16_9
if (tupleData->colstatus[remoteattnum] ==
LOGICALREP_COLUMN_TEXT)
{
@@ -1062,12 +996,8 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot
*srcslot,
slot->tts_isnull[i] = true;
}
-<<<<<<< HEAD
- errarg.remote_attnum = -1;
-=======
/* Reset attnum for error callback */
apply_error_callback_arg.remote_attnum = -1;
->>>>>>> REL_16_9
}
}
@@ -2214,12 +2144,8 @@ apply_spooled_messages(FileSet *stream_fileset,
TransactionId xid,
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
-<<<<<<< HEAD
- apply_handle_commit_internal(&commit_data);
-=======
return;
}
->>>>>>> REL_16_9
/*
* Handle STREAM COMMIT message.
@@ -3344,11 +3270,8 @@ apply_handle_truncate(StringInfo s)
relids_logged,
DROP_RESTRICT,
restart_seqs,
-<<<<<<< HEAD
+ !MySubscription->runasowner,
NULL);
-=======
- !MySubscription->runasowner);
->>>>>>> REL_16_9
foreach(lc, remote_rels)
{
LogicalRepRelMapEntry *rel = lfirst(lc);
@@ -4093,6 +4016,7 @@ subxact_info_write(Oid subid, TransactionId xid)
char path[MAXPGPATH];
Size len;
BufFile *fd;
+ workfile_set *work_set;
Assert(TransactionIdIsValid(xid));
@@ -4112,33 +4036,14 @@ subxact_info_write(Oid subid, TransactionId xid)
* Create the subxact file if it not already created, otherwise open the
* existing file.
*/
-<<<<<<< HEAD
- if (ent->subxact_fileset == NULL)
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
O_RDWR,
+ true);
+ if (fd == NULL)
{
- MemoryContext oldctx;
- workfile_set *work_set;
-
- /*
- * We need to maintain shared fileset across multiple stream
- * start/stop calls. So, need to allocate it in a persistent
context.
- */
- oldctx = MemoryContextSwitchTo(ApplyContext);
- ent->subxact_fileset = palloc(sizeof(SharedFileSet));
- SharedFileSetInit(ent->subxact_fileset, NULL);
- MemoryContextSwitchTo(oldctx);
-
work_set = workfile_mgr_create_set("Subxact", path, false /*
hold pin */);
+ fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
path, work_set);
- fd = BufFileCreateShared(ent->subxact_fileset, path, work_set);
}
- else
- fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
-=======
- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
O_RDWR,
- true);
- if (fd == NULL)
- fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
path);
->>>>>>> REL_16_9
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
@@ -4358,39 +4263,13 @@ stream_open_file(Oid subid, TransactionId xid, bool
first_segment)
* Otherwise, just open the file for writing, in append mode.
*/
if (first_segment)
-<<<<<<< HEAD
{
- MemoryContext savectx;
- SharedFileSet *fileset;
workfile_set *work_set;
- if (found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect
first-segment flag for streamed replication transaction")));
-
- /*
- * We need to maintain shared fileset across multiple stream
- * start/stop calls. So, need to allocate it in a persistent
context.
- */
- savectx = MemoryContextSwitchTo(ApplyContext);
- fileset = palloc(sizeof(SharedFileSet));
-
- SharedFileSetInit(fileset, NULL);
- MemoryContextSwitchTo(savectx);
-
work_set = workfile_mgr_create_set("LogicalStreaming", path,
false /* hold pin */);
- stream_fd = BufFileCreateShared(fileset, path, work_set);
-
- /* Remember the fileset for the next stream of the same
transaction */
- ent->xid = xid;
- ent->stream_fileset = fileset;
- ent->subxact_fileset = NULL;
- }
-=======
stream_fd =
BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
-
path);
->>>>>>> REL_16_9
+
path, work_set);
+ }
else
{
/*
@@ -4609,30 +4488,6 @@ void
InitializeApplyWorker(void)
{
MemoryContext oldctx;
-<<<<<<< HEAD
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
- WalRcvStreamOptions options;
-
- /* Attach to slot */
- logicalrep_worker_attach(worker_slot);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGTERM, die);
- BackgroundWorkerUnblockSignals();
-
- /*
- * We don't currently need any ResourceOwner in a walreceiver process,
but
- * if we did, we could call CreateAuxProcessResourceOwner here.
- */
-
- /* Initialise stats to a sanish value */
- MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time
=
- MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
-
- /* Load the libpq-specific functions */
/*
* In GPDB, we build libpqwalreceiver functions, as well as a copy of
* libpq into the backend itself, to support QD-QE communication. See
@@ -4640,8 +4495,6 @@ InitializeApplyWorker(void)
*/
if (!WalReceiverFunctions)
libpqwalreceiver_PG_init();
-=======
->>>>>>> REL_16_9
/* Run as replica session replication role. */
SetConfigOption("session_replication_role", "replica",
diff --git a/src/backend/replication/pgoutput/pgoutput.c
b/src/backend/replication/pgoutput/pgoutput.c
index df4ac9a123a..32b74bb4752 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -94,10 +94,6 @@ static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
-<<<<<<< HEAD
-
LogicalDecodingContext *ctx);
-static void update_replication_progress(LogicalDecodingContext *ctx);
-=======
LogicalDecodingContext *ctx,
Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
@@ -116,7 +112,6 @@ enum RowFilterPubAction
};
#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
->>>>>>> REL_16_9
/*
* Entry in the map used to remember which relation schemas we sent.
@@ -611,9 +606,6 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
-<<<<<<< HEAD
- update_replication_progress(ctx);
-=======
PGOutputTxnData *txndata = (PGOutputTxnData *)
txn->output_plugin_private;
bool sent_begin_txn;
@@ -633,7 +625,6 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
elog(DEBUG1, "skipped replication of an empty transaction with
XID: %u", txn->xid);
return;
}
->>>>>>> REL_16_9
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -1444,8 +1435,6 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- update_replication_progress(ctx);
-
if (!is_publishable_relation(relation))
return;
@@ -1613,8 +1602,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;
- update_replication_progress(ctx);
-
/* Remember the xid for the change in streaming mode. See
pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
@@ -1678,8 +1665,6 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
- update_replication_progress(ctx);
-
if (!data->messages)
return;
@@ -1885,11 +1870,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext
*ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
-<<<<<<< HEAD
- update_replication_progress(ctx);
-=======
OutputPluginUpdateProgress(ctx, false);
->>>>>>> REL_16_9
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -2058,10 +2039,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
int publish_ancestor_level = 0;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
-<<<<<<< HEAD
-=======
List *rel_publications = NIL;
->>>>>>> REL_16_9
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -2154,25 +2132,15 @@ get_rel_sync_entry(PGOutputData *data, Relation
relation)
/*
* Under what relid should we publish changes in this
publication?
-<<<<<<< HEAD
- * We'll use the top-most relid across all
publications. Also track
- * the ancestor level for this publication.
-=======
* We'll use the top-most relid across all
publications. Also
* track the ancestor level for this publication.
->>>>>>> REL_16_9
*/
Oid pub_relid = relid;
int ancestor_level = 0;
/*
-<<<<<<< HEAD
- * If this is a FOR ALL TABLES publication, pick the
partition root
- * and set the ancestor level accordingly.
-=======
* If this is a FOR ALL TABLES publication, pick the
partition
* root and set the ancestor level accordingly.
->>>>>>> REL_16_9
*/
if (pub->alltables)
{
@@ -2201,11 +2169,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
Oid ancestor;
int level;
List *ancestors =
get_partition_ancestors(relid);
-<<<<<<< HEAD
- ListCell *lc2;
- int level = 0;
-=======
->>>>>>> REL_16_9
ancestor =
GetTopMostAncestorInPublication(pub->oid,
ancestors,
@@ -2213,27 +2176,11 @@ get_rel_sync_entry(PGOutputData *data, Relation
relation)
if (ancestor != InvalidOid)
{
-<<<<<<< HEAD
- Oid
ancestor = lfirst_oid(lc2);
-
- level++;
-
- if
(list_member_oid(GetRelationPublications(ancestor),
-
pub->oid))
- {
- ancestor_published =
true;
- if (pub->pubviaroot)
- {
- pub_relid =
ancestor;
- ancestor_level
= level;
- }
-=======
ancestor_published = true;
if (pub->pubviaroot)
{
pub_relid = ancestor;
ancestor_level = level;
->>>>>>> REL_16_9
}
}
}
@@ -2262,27 +2209,14 @@ get_rel_sync_entry(PGOutputData *data, Relation
relation)
/*
* We want to publish the changes as the
top-most ancestor
-<<<<<<< HEAD
- * across all publications. So we need to check
if the
- * already calculated level is higher than the
new one. If
- * yes, we can ignore the new value (as it's a
child).
- * Otherwise the new value is an ancestor, so
we keep it.
-=======
* across all publications. So we need to check
if the already
* calculated level is higher than the new one.
If yes, we can
* ignore the new value (as it's a child).
Otherwise the new
* value is an ancestor, so we keep it.
->>>>>>> REL_16_9
*/
if (publish_ancestor_level > ancestor_level)
continue;
-<<<<<<< HEAD
- /* The new value is an ancestor, so let's keep
it. */
- publish_as_relid = pub_relid;
- publish_ancestor_level = ancestor_level;
- }
-=======
/*
* If we found an ancestor higher up in the
tree, discard the
* list of publications through which we
replicate it, and use
@@ -2324,7 +2258,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Initialize the column list */
pgoutput_column_list_init(data, rel_publications,
entry);
->>>>>>> REL_16_9
}
list_free(pubids);
@@ -2489,36 +2422,3 @@ send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id,
}
}
}
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are not published.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx)
-{
- static int changes_count = 0;
-
- /*
- * We don't want to try sending a keepalive message after processing
each
- * change as that can have overhead. Tests revealed that there is no
- * noticeable overhead in doing it after continuously processing 100 or
so
- * changes.
- */
-#define CHANGES_THRESHOLD 100
-
- /*
- * If we are at the end of transaction LSN, update progress tracking.
- * Otherwise, after continuously processing CHANGES_THRESHOLD changes,
we
- * try to send a keepalive message if required.
- */
- if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
- {
- OutputPluginUpdateProgress(ctx);
- changes_count = 0;
- }
-}
diff --git a/src/backend/replication/repl_gram.y
b/src/backend/replication/repl_gram.y
index af609f5922f..dc826764007 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -66,15 +66,12 @@ Node *replication_parse_result;
%token K_DROP_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_WAIT
-<<<<<<< HEAD
%token K_NOWAIT
%token K_EXCLUDE
%token K_MAX_RATE
%token K_WAL
%token K_TABLESPACE_MAP
%token K_NOVERIFY_CHECKSUMS
-=======
->>>>>>> REL_16_9
%token K_TIMELINE
%token K_PHYSICAL
%token K_LOGICAL
@@ -89,15 +86,9 @@ Node *replication_parse_result;
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot
identify_system
-<<<<<<< HEAD
- timeline_history show
-%type <list> base_backup_opt_list
-%type <defelt> base_backup_opt
-=======
read_replication_slot timeline_history show
%type <list> generic_option_list
%type <defelt> generic_option
->>>>>>> REL_16_9
%type <uintval> opt_timeline
%type <list> plugin_options plugin_opt_list
%type <defelt> plugin_opt_elem
@@ -186,7 +177,6 @@ base_backup:
}
;
-<<<<<<< HEAD
base_backup_opt_list:
base_backup_opt_list base_backup_opt
{ $$ = lappend($1, $2); }
@@ -252,8 +242,6 @@ base_backup_opt:
}
;
-=======
->>>>>>> REL_16_9
create_replication_slot:
/* CREATE_REPLICATION_SLOT slot [TEMPORARY] PHYSICAL
[options] */
K_CREATE_REPLICATION_SLOT IDENT opt_temporary
K_PHYSICAL create_slot_options
@@ -448,11 +436,6 @@ plugin_opt_arg:
| /* EMPTY */ { $$ =
NULL; }
;
-<<<<<<< HEAD
-%%
-
-#include "repl_scanner.c"
-=======
generic_option_list:
generic_option_list ',' generic_option
{ $$ = lappend($1, $3); }
@@ -502,4 +485,3 @@ ident_or_keyword:
;
%%
->>>>>>> REL_16_9
diff --git a/src/backend/replication/repl_scanner.l
b/src/backend/replication/repl_scanner.l
index 73c22fa2d67..4e2edaba095 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -121,7 +121,6 @@ BASE_BACKUP { return K_BASE_BACKUP; }
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; }
SHOW { return K_SHOW; }
-<<<<<<< HEAD
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
EXCLUDE { return K_EXCLUDE; }
@@ -130,8 +129,6 @@ MAX_RATE { return K_MAX_RATE; }
WAL { return K_WAL; }
TABLESPACE_MAP { return K_TABLESPACE_MAP; }
NOVERIFY_CHECKSUMS { return K_NOVERIFY_CHECKSUMS; }
-=======
->>>>>>> REL_16_9
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
@@ -216,11 +213,7 @@ WAIT { return K_WAIT; }
return yytext[0];
}
-<<<<<<< HEAD
-<xq,xd><<EOF>> { yyerror("unterminated quoted string"); }
-=======
<xq,xd><<EOF>> { replication_yyerror("unterminated quoted string"); }
->>>>>>> REL_16_9
<<EOF>> {
@@ -316,10 +309,7 @@ replication_scanner_is_replication_command(void)
case K_START_REPLICATION:
case K_CREATE_REPLICATION_SLOT:
case K_DROP_REPLICATION_SLOT:
-<<<<<<< HEAD
-=======
case K_READ_REPLICATION_SLOT:
->>>>>>> REL_16_9
case K_TIMELINE_HISTORY:
case K_SHOW:
/* Yes; push back the first token so we can parse
later. */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 601c944b731..66659343be8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -873,12 +873,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
SpinLockAcquire(&s->mutex);
effective_xmin = s->effective_xmin;
effective_catalog_xmin = s->effective_catalog_xmin;
-<<<<<<< HEAD
- invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
-
XLogRecPtrIsInvalid(s->data.restart_lsn));
-=======
invalidated = s->data.invalidated != RS_INVAL_NONE;
->>>>>>> REL_16_9
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
@@ -1335,14 +1330,10 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause
cause,
* for syscalls, so caller must restart if we return true.
*/
static bool
-<<<<<<< HEAD
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
-=======
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
ReplicationSlot *s,
XLogRecPtr oldestLSN,
Oid dboid,
TransactionId snapshotConflictHorizon,
->>>>>>> REL_16_9
bool *invalidated)
{
int last_signaled_pid = 0;
@@ -1458,10 +1449,6 @@
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
-<<<<<<< HEAD
- s->data.invalidated_at = restart_lsn;
- s->data.restart_lsn = InvalidXLogRecPtr;
-=======
s->data.invalidated = conflict;
/*
@@ -1470,7 +1457,6 @@
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
*/
if (conflict == RS_INVAL_WAL_REMOVED)
s->data.restart_lsn = InvalidXLogRecPtr;
->>>>>>> REL_16_9
/* Let caller know */
*invalidated = true;
@@ -1582,12 +1568,6 @@
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
*/
bool
-<<<<<<< HEAD
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
-{
- XLogRecPtr oldestLSN;
- bool invalidated = false;
-=======
InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
XLogSegNo
oldestSegno, Oid dboid,
TransactionId snapshotConflictHorizon)
@@ -1601,7 +1581,6 @@
InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
if (max_replication_slots == 0)
return invalidated;
->>>>>>> REL_16_9
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
@@ -1614,13 +1593,9 @@ restart:
if (!s->in_use)
continue;
-<<<<<<< HEAD
- if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
-=======
if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
snapshotConflictHorizon,
&invalidated))
->>>>>>> REL_16_9
{
/* if the lock was released, start from scratch */
goto restart;
@@ -1633,12 +1608,9 @@ restart:
*/
if (invalidated)
{
-<<<<<<< HEAD
/* GPDB: Set WalSndCtl state to indicate persistent sync error
state */
WalSndCtl->error = WALSNDERROR_WALREAD;
-=======
->>>>>>> REL_16_9
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN();
}
diff --git a/src/backend/replication/slotfuncs.c
b/src/backend/replication/slotfuncs.c
index 576a2eeadee..e01adb00795 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -26,7 +26,6 @@
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
-<<<<<<< HEAD
static void
check_permissions(void)
{
@@ -43,8 +42,6 @@ warn_slot_only_created_on_segment(const char *name) {
errhint("Creating replication slots on a single
segment is not advised. Replication slots are automatically created by
management tools.")));
}
-=======
->>>>>>> REL_16_9
/*
* Helper function for creating a new physical replication slot with
* given arguments. Note that this function doesn't release the created
diff --git a/src/backend/replication/syncrep.c
b/src/backend/replication/syncrep.c
index 51a7e17a2c3..4b0660e54ce 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -222,12 +222,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
* don't touch the queue.
*/
if (!SyncRepRequested() ||
-<<<<<<< HEAD
(!IS_QUERY_DISPATCHER() && !((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined))
-=======
- ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status)
&
- (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) ==
SYNC_STANDBY_INIT)
->>>>>>> REL_16_9
return;
/* Cap the level for anything other than commit to remote flush only. */
@@ -243,7 +238,6 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
/*
-<<<<<<< HEAD
* GPDB special behavior: if the master/coordinator doesn't configure a
standby,
* or the standby is down, or the connection between the
master/coordinator and standby
* is broken, the xlog will not be synchronized to the standby before
the key
@@ -330,12 +324,8 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
}
/*
- * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
- * set. See SyncRepUpdateSyncStandbysDefined.
-=======
* We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
* SyncRepUpdateSyncStandbysDefined().
->>>>>>> REL_16_9
*
* Also check that the standby hasn't already replied. Unlikely race
* condition but we'll be fetching that cache line anyway so it's likely
@@ -345,7 +335,6 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
* (SYNC_STANDBY_INIT is not set), fall back to a check based on the
LSN,
* then do a direct GUC check.
*/
-<<<<<<< HEAD
if (((!IS_QUERY_DISPATCHER()) && !WalSndCtl->sync_standbys_defined) ||
lsn <= WalSndCtl->lsn[mode])
{
@@ -356,43 +345,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
(uint32) (WalSndCtl->lsn[mode] >> 32), (uint32)
WalSndCtl->lsn[mode],
(uint32) (lsn >> 32), (uint32) lsn);
-=======
- if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
- {
- if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) ==
0 ||
- lsn <= WalSndCtl->lsn[mode])
- {
- LWLockRelease(SyncRepLock);
- return;
- }
- }
- else if (lsn <= WalSndCtl->lsn[mode])
- {
- /*
- * The LSN is older than what we need to wait for. The sync
standby
- * data has not been initialized yet, but we are OK to not wait
- * because we know that there is no point in doing so based on
the
- * LSN.
- */
- LWLockRelease(SyncRepLock);
- return;
- }
- else if (!SyncStandbysDefined())
- {
- /*
- * If we are here, the sync standby data has not been
initialized yet,
- * and the LSN is newer than what need to wait for, so we have
fallen
- * back to the best thing we could do in this case: a check on
- * SyncStandbysDefined() to see if the GUC is set or not.
- *
- * When the GUC has a value, we wait until the checkpointer
updates
- * the status data because we cannot be sure yet if we should
wait or
- * not. Here, the GUC has *no* value, we are sure that there is
no
- * point to wait; this matters for example when initializing a
- * cluster, where we should never wait, and no sync standbys is
the
- * default behavior.
- */
->>>>>>> REL_16_9
+
LWLockRelease(SyncRepLock);
return;
}
@@ -415,22 +368,8 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{
char buffer[32];
-<<<<<<< HEAD
- old_status = get_real_act_ps_display(&len);
- /*
- * The 32 represents the bytes in the string " waiting for
%X/%X", as
- * in upstream. The 12 represents GPDB specific " replication"
suffix.
- */
- new_status = (char *) palloc(len + 32 + 12 + 1);
- memcpy(new_status, old_status, len);
- sprintf(new_status + len, " waiting for %X/%X",
- LSN_FORMAT_ARGS(lsn));
- set_ps_display(new_status);
- new_status[len] = '\0'; /* truncate off " waiting ..." */
-=======
sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
set_ps_display_suffix(buffer);
->>>>>>> REL_16_9
}
/* Report the wait */
@@ -551,20 +490,11 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
MyProc->waitLSN = 0;
-<<<<<<< HEAD
pgstat_report_wait_end();
- if (new_status)
- {
- /* Reset ps display */
- set_ps_display(new_status);
- pfree(new_status);
- }
-=======
/* reset ps display to remove the suffix */
if (update_process_title)
set_ps_display_remove_suffix();
->>>>>>> REL_16_9
}
/*
diff --git a/src/backend/replication/walreceiver.c
b/src/backend/replication/walreceiver.c
index 2444d79cd7a..317b0312caa 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -140,19 +140,12 @@ static StringInfoData incoming_message;
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID
*startpointTLI);
static void WalRcvDie(int code, Datum arg);
-<<<<<<< HEAD
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
-static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvClose(XLogRecPtr recptr);
-=======
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
TimeLineID
tli);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
TimeLineID tli);
static void XLogWalRcvFlush(bool dying, TimeLineID tli);
static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
->>>>>>> REL_16_9
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -929,18 +922,6 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
TimeLineID tli)
/* Close the current segment if it's completed */
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo,
wal_segment_size))
-<<<<<<< HEAD
- XLogWalRcvClose(recptr);
-
- if (recvFile < 0)
- {
- bool use_existent = true;
-
- /* Create/use new log file */
- XLByteToSeg(recptr, recvSegNo, wal_segment_size);
- recvFile = XLogFileInit(recvSegNo, &use_existent, true);
- recvFileTLI = ThisTimeLineID;
-=======
XLogWalRcvClose(recptr, tli);
if (recvFile < 0)
@@ -949,7 +930,6 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
TimeLineID tli)
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
recvFile = XLogFileInit(recvSegNo, tli);
recvFileTLI = tli;
->>>>>>> REL_16_9
}
/* Calculate the start offset of the received logs */
@@ -1011,11 +991,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr
recptr, TimeLineID tli)
* segment is received and written.
*/
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-<<<<<<< HEAD
- XLogWalRcvClose(recptr);
-=======
XLogWalRcvClose(recptr, tli);
->>>>>>> REL_16_9
}
/*
@@ -1086,29 +1062,18 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
* Create an archive notification file since the segment is known completed.
*/
static void
-<<<<<<< HEAD
-XLogWalRcvClose(XLogRecPtr recptr)
-=======
XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
->>>>>>> REL_16_9
{
char xlogfname[MAXFNAMELEN];
Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo,
wal_segment_size));
-<<<<<<< HEAD
-=======
Assert(tli != 0);
->>>>>>> REL_16_9
/*
* fsync() and close current file before we switch to next one. We would
* otherwise have to reopen this file to fsync it later
*/
-<<<<<<< HEAD
- XLogWalRcvFlush(false);
-=======
XLogWalRcvFlush(false, tli);
->>>>>>> REL_16_9
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
@@ -1120,11 +1085,7 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
-<<<<<<< HEAD
- errmsg("could not close log segment %s: %m",
-=======
errmsg("could not close WAL segment %s: %m",
->>>>>>> REL_16_9
xlogfname)));
/*
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index cb2c065a4a7..e3370266438 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -98,14 +98,12 @@
#include "utils/timeout.h"
#include "utils/timestamp.h"
-<<<<<<< HEAD
#include "cdb/cdbvars.h"
#include "replication/gp_replication.h"
#include "utils/faultinjector.h"
-=======
+
/* Minimum interval used by walsender for stats flushes, in ms */
#define WALSENDER_STATS_FLUSH_INTERVAL 1000
->>>>>>> REL_16_9
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
@@ -130,14 +128,9 @@ bool am_cascading_walsender = false; /* Am I
cascading WAL to another
* standby? */
bool am_db_walsender = false; /* Connected to a database? */
-<<<<<<< HEAD
/* User-settable parameters for walsender */
-int repl_catchup_within_range = 0;
-int max_wal_senders = 0; /* the maximum number of
concurrent
-=======
-/* GUC variables */
+int repl_catchup_within_range = 0*/
int max_wal_senders = 10; /* the maximum number of
concurrent
->>>>>>> REL_16_9
*
walsenders */
int wal_sender_timeout = 60 * 1000; /* maximum time to send
one WAL
* data message */
@@ -262,14 +255,9 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
-<<<<<<< HEAD
static const char *WalSndGetStateString(WalSndState state);
static void ProcessPendingWrites(void);
-static void WalSndKeepalive(bool requestReply);
-=======
-static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
->>>>>>> REL_16_9
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
static long WalSndComputeSleeptime(TimestampTz now);
@@ -1518,10 +1506,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx,
XLogRecPtr lsn, TransactionId
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
-<<<<<<< HEAD
-=======
bool pending_writes = false;
->>>>>>> REL_16_9
bool end_xact = ctx->end_xact;
/*
@@ -1541,17 +1526,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx,
XLogRecPtr lsn, TransactionId
}
/*
-<<<<<<< HEAD
- * Try to send a keepalive if required. We don't need to try sending
keep
- * alive messages at the transaction end as that will be done at a later
- * point in time. This is required only for large transactions where we
- * don't send any changes to the downstream and the receiver can timeout
- * due to that.
- */
- if (!end_xact &&
- now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
-
wal_sender_timeout / 2))
-=======
* When skipping empty transactions in synchronous replication, we send
a
* keepalive message to avoid delaying such transactions.
*
@@ -1584,7 +1558,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx,
XLogRecPtr lsn, TransactionId
if (pending_writes || (!end_xact &&
now >=
TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2)))
->>>>>>> REL_16_9
ProcessPendingWrites();
}
@@ -2744,11 +2717,9 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
-<<<<<<< HEAD
/* Will be decided in hand-shake */
walsnd->xlogCleanUpTo = InvalidXLogRecPtr;
walsnd->caughtup_within_range = false;
-=======
/*
* The kind assignment is done here and not in
StartReplication()
@@ -2766,7 +2737,6 @@ InitWalSenderSlot(void)
else
walsnd->kind = REPLICATION_KIND_LOGICAL;
->>>>>>> REL_16_9
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 8309ab247b1..1accb0617cc 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -62,7 +62,7 @@ extern BufFile *BufFileOpenShared(SharedFileSet *fileset,
const char *name,
extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset);
-extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
+extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name,
workfile_set *work_set);
extern void BufFileExportFileSet(BufFile *file);
extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
int mode,
bool missing_ok);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]