zhangyue1818 commented on code in PR #1357:
URL: https://github.com/apache/cloudberry/pull/1357#discussion_r2432405915


##########
contrib/udp2/ic_common/udp2/ic_udp2.cpp:
##########
@@ -0,0 +1,6958 @@
+/*-------------------------------------------------------------------------
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * ic_udp2.cpp
+ *
+ * IDENTIFICATION
+ *    contrib/udp2/ic_common/udp2/ic_udp2.cpp
+ *
+ *               +--------------+
+ *               |  ic_types.h  |
+ *               +--------------+
+ *                  /         \
+ *       +--------------+  +---------------+
+ *       | C interface  |  | C++ interface |
+ *       |  ic_udp2.h   |  |  ic_udp2.hpp  |
+ *       +--------------+  +---------------+
+ *                   \         /
+ *            +----------------------+
+ *            |     C++ implement    |
+ *            |  ic_udp2_internal.hpp|
+ *            |  ic_faultinjection.h |
+ *            |  ic_udp2.cpp         |
+ *            +----------------------+
+ *-------------------------------------------------------------------------
+ */
+#include <chrono>
+#include <condition_variable>
+#include <mutex>
+#include <sstream>
+#include <stdexcept>
+#include <thread>
+#include <vector>
+#include <atomic>
+#include <cstdarg>
+
+/*
+ * interface header files
+ */
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ic_types.h"
+#include "ic_udp2.h"
+
+#ifdef __cplusplus
+}
+#endif
+
+#include "ic_udp2.hpp"
+
+/*
+ * internal header files
+ */
+#include "ic_utility.hpp"
+#include "ic_udp2_internal.hpp"
+#include "ic_faultinjection.h"
+
+static int timeoutArray[] =
+{
+       1,
+       1,
+       2,
+       4,
+       8,
+       16,
+       32,
+       64,
+       128,
+       256,
+       512,
+       512                                                     /* MAX_TRY */
+};
+
+/*
+ * Main thread (Receiver) and background thread use the information in
+ * this data structure to handle data packets.
+ */
+static ReceiveControlInfo rx_control_info;
+
+/*
+ * The buffer pool used for keeping data packets.
+ *
+ * maxCount is set to 1 to make sure there is always a buffer
+ * for picking packets from OS buffer.
+ */
+static RxBufferPool rx_buffer_pool = {1, 0, NULL};
+
+/*
+ * The sender side buffer pool.
+ */
+static SendBufferPool snd_buffer_pool;
+
+/*
+ * Main thread use the information in this data structure to do ack handling
+ * and congestion control.
+ */
+static SendControlInfo snd_control_info;
+
+/*
+ * Shared control information that is used by senders, receivers and 
background thread.
+ */
+static ICGlobalControlInfo ic_control_info;
+
+/*
+ * All connections in a process share this unack queue ring instance.
+ */
+static UnackQueueRing unack_queue_ring = {0, 0, 0};
+
+static int     ICSenderSocket = -1;
+static int32 ICSenderPort = 0;
+static int     ICSenderFamily = 0;
+
+/* Statistics for UDP interconnect. */
+static ICStatistics ic_statistics;
+
+/* Cached sockaddr of the listening udp socket */
+static struct sockaddr_storage udp_dummy_packet_sockaddr;
+
+/* UDP listen fd */
+static int                     UDP_listenerFd = -1;
+
+/* UDP listen port */
+static int32 udp_listener_port = 0;
+
+static std::mutex mtx;
+static std::condition_variable cv;
+
+CChunkTransportState *CChunkTransportStateImpl::state_ = nullptr;
+
+/*
+ * Identity the user of ic module by vector_engine_is_user:
+ * "false" means PG executor, "true" means Arrow executor.
+ */
+static thread_local bool vector_engine_is_user = false;
+static thread_local bool thread_quit = false;
+
+#define CHECK_QUIT_FLAG() \
+       do { \
+               if (thread_quit) { \
+                       throw ICException("received thread quit flag.", 
__FILE__, __LINE__); \
+               } \
+       } while(0)
+
+#define CHECK_INTERRUPTS(state) \
+       do { \
+               if (vector_engine_is_user) { \
+                       CHECK_QUIT_FLAG(); \
+               } else if (global_param.checkInterruptsCallback) { \
+                       
global_param.checkInterruptsCallback((state)->teardownActive); \
+               } \
+       } while(0)
+
+#define CHECK_CANCEL(state) \
+       do { \
+               if (vector_engine_is_user) { \
+                       CHECK_QUIT_FLAG(); \
+               } else if (global_param.checkCancelOnQDCallback) { \
+                       global_param.checkCancelOnQDCallback(state); \
+               } \
+       } while(0)
+
+#define CHECK_POSTMASTER_ALIVE() \
+       do { \
+               if (vector_engine_is_user) { \
+                       CHECK_QUIT_FLAG(); \
+               } else if (global_param.checkPostmasterIsAliveCallback && 
!global_param.checkPostmasterIsAliveCallback()) { \
+                       throw ICFatalException("FATAL, interconnect failed to 
send chunks, Postmaster is not alive.", __FILE__, __LINE__); \
+               } \
+       } while(0)
+
+/*=========================================================================
+ * STATIC FUNCTIONS declarations
+ */
+
+/* Background thread error handling functions. */
+static void checkRxThreadError(void);
+static void setRxThreadError(int eno);
+static void resetRxThreadError(void);
+
+static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type);
+static void setupOutgoingUDPConnection(int icid, TransportEntry *pChunkEntry, 
UDPConn *conn);
+
+/* ICBufferList functions. */
+static inline void icBufferListInitHeadLink(ICBufferLink *link);
+
+static inline void InitMotionUDPIFC(int *listenerSocketFd, int32 
*listenerPort);
+static inline void CleanupMotionUDPIFC(void);
+
+static bool dispatcherAYT(void);
+static void checkQDConnectionAlive(void);
+
+static void *rxThreadFunc(void *arg);
+
+static void putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 
expTime, uint64 now);
+
+static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, 
int peer_len);
+static void cleanupStartupCache(void);
+static void handleCachedPackets(void);
+
+static uint64 getCurrentTime(void);
+static void initMutex(pthread_mutex_t *mutex);
+
+static inline void logPkt(const char *prefix, icpkthdr *pkt);
+
+static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, 
socklen_t *o_len);
+#if defined(__darwin__)
+#define    s6_addr32 __u6_addr.__u6_addr32
+static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest);
+#endif
+
+static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort,
+                           int *txFamily, struct sockaddr_storage 
*listenerSockaddr);
+static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, 
const char *listenerAddr, int listenerPort);
+static void SendDummyPacket(void);
+static bool handleDataPacket(UDPConn *conn, icpkthdr *pkt, struct 
sockaddr_storage *peer, socklen_t *peerlen, AckSendParam *param, bool 
*wakeup_mainthread);
+static bool handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int 
peer_len);
+static void initUnackQueueRing(UnackQueueRing *uqr);
+
+static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, 
int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, 
const char *errDetail);
+
+static char *format_sockaddr_udp(struct sockaddr_storage *sa, char *buf, 
size_t len);
+
+static char* flags2txt(uint32 pkt_flags);
+
+static const char* flags_text[] =
+       {"recv2send", "ack", "stop", "eos", "nak", "disorder", "duplicate", 
"capacity"};
+
+static char*
+flags2txt(uint32 pkt_flags)
+{
+       static char flags[64];
+
+       char *p = flags;
+       *p = '\0';
+       int bytes = 0;
+       for (size_t i = 0; i < sizeof(flags_text)/sizeof(flags_text[0]); ++i) {
+               if (pkt_flags & (1 << i))
+                       bytes += snprintf(p + bytes, 64, "%s | ", 
flags_text[i]);
+       }
+
+       if (bytes > 0)
+               bytes -= 3;
+       *(p + bytes) = '\0';
+
+       return flags;
+}
+
+/*
+ * CursorICHistoryTable::prune
+ *             Prune entries in the hash table.
+ */
+void
+CursorICHistoryTable::prune(uint32 icId) {
+       for (uint8 index = 0; index < size; index++) {
+               CursorICHistoryEntry *p = table[index], *q = NULL;
+               while (p) {
+                       /* remove an entry if it is older than the prune-point 
*/
+                       if (p->icId < icId) {
+                               if (!q)
+                                       table[index] = p->next;
+                               else
+                                       q->next = p->next;
+
+                               /* set up next loop */
+                               CursorICHistoryEntry *trash = p;
+                               p = trash->next;
+                               ic_free(trash);
+
+                               count--;
+                       } else {
+                               q = p;
+                               p = p->next;
+                       }
+               }
+       }
+}
+
+#ifdef TRANSFER_PROTOCOL_STATS
+typedef enum TransProtoEvent TransProtoEvent;
+enum TransProtoEvent
+{
+       TPE_DATA_PKT_SEND,
+       TPE_ACK_PKT_QUERY
+};
+
+typedef struct TransProtoStatEntry TransProtoStatEntry;
+struct TransProtoStatEntry
+{
+       TransProtoStatEntry *next;
+
+       /* Basic information */
+       uint32          time;
+       TransProtoEvent event;
+       int                     dstPid;
+       uint32          seq;
+
+       /* more attributes can be added on demand. */
+
+       /*
+        * float        cwnd;
+        * int          capacity;
+        */
+};
+
+typedef struct TransProtoStats TransProtoStats;
+struct TransProtoStats
+{
+       pthread_mutex_t lock;
+       TransProtoStatEntry *head;
+       TransProtoStatEntry *tail;
+       uint64          count;
+       uint64          startTime;
+
+       void init();
+       void update(TransProtoEvent event, icpkthdr *pkt);
+       void dump();
+};
+
+static TransProtoStats trans_proto_stats =
+{
+       PTHREAD_MUTEX_INITIALIZER, NULL, NULL, 0
+};
+
+/*
+ * init
+ *             Initialize the transport protocol states data structures.
+ */
+void
+TransProtoStats::init()
+{
+       pthread_mutex_lock(&this->lock);
+
+       while (this->head) {
+               TransProtoStatEntry *cur = this->head;
+               this->head = this->head->next;
+               ic_free(cur);
+               this->count--;
+       }
+
+       this->head = NULL;
+       this->tail = NULL;
+       this->count = 0;
+       this->startTime = getCurrentTime();
+
+       pthread_mutex_unlock(&this->lock);
+}
+
+void
+TransProtoStats::update(TransProtoEvent event, icpkthdr *pkt)
+{
+       /* Add to list */
+       TransProtoStatEntry *entry = (TransProtoStatEntry *) 
ic_malloc(sizeof(TransProtoStatEntry));
+       if (!entry)
+               return;
+
+       memset(entry, 0, sizeof(*entry));
+
+       /* change the list */
+       pthread_mutex_lock(&this->lock);
+       if (this->count == 0) {
+               /* 1st element */
+               this->head = entry;
+               this->tail = entry;
+       } else {
+               this->tail->next = entry;
+               this->tail = entry;
+       }
+       this->count++;
+
+       entry->time = getCurrentTime() - this->startTime;
+       entry->event = event;
+       entry->dstPid = pkt->dstPid;
+       entry->seq = pkt->seq;
+
+       /*
+        * Other attributes can be added on demand new->cwnd =
+        * snd_control_info.cwnd; new->capacity = conn->capacity;
+        */
+
+       pthread_mutex_unlock(&this->lock);
+}
+
+static void
+TransProtoStats::dump()
+{
+       char            tmpbuf[32];
+
+       snprintf(tmpbuf, 32, "%d." UINT64_FORMAT "txt", global_param.MyProcPid, 
getCurrentTime());
+       FILE       *ofile = fopen(tmpbuf, "w+");
+
+       pthread_mutex_lock(&this->lock);
+       while (this->head)
+       {
+               TransProtoStatEntry *cur = NULL;
+
+               cur = this->head;
+               this->head = this->head->next;
+
+               fprintf(ofile, "time %d event %d seq %d destpid %d\n", 
cur->time, cur->event, cur->seq, cur->dstPid);
+               ic_free(cur);
+               this->count--;
+       }
+
+       this->tail = NULL;
+
+       pthread_mutex_unlock(&this->lock);
+
+       fclose(ofile);
+}
+
+#endif                                                 /* 
TRANSFER_PROTOCOL_STATS */
+
+/*
+ * initConnHashTable
+ *             Initialize a connection hash table.
+ */
+bool
+ConnHashTable::init()
+{
+       this->size = global_param.Gp_role == GP_ROLE_DISPATCH_IC ?
+                                (global_param.segment_number * 2) : 
global_param.ic_htab_size;
+       Assert(this->size > 0);
+
+       this->table = (struct ConnHtabBin **) ic_malloc(this->size * 
sizeof(struct ConnHtabBin *));
+       if (this->table == NULL)
+               return false;
+
+       for (int i = 0; i < this->size; i++)
+               this->table[i] = NULL;
+
+       return true;
+}
+
+/*
+ * connAddHash
+ *             Add a connection to the hash table
+ *
+ * Note: we want to add a connection to the hashtable if it isn't
+ * already there ... so we just have to check the pointer values -- no
+ * need to use CONN_HASH_MATCH() at all!
+ */
+bool
+ConnHashTable::add(UDPConn *conn)
+{
+       uint32 hashcode = CONN_HASH_VALUE(&conn->conn_info) % this->size;
+
+       /*
+        * check for collision -- if we already have an entry for this 
connection,
+        * don't add another one.
+        */
+       for (struct ConnHtabBin *bin = this->table[hashcode]; bin != NULL; bin 
= bin->next)
+       {
+               if (bin->conn == conn)
+               {
+                       LOG(DEBUG5, "ConnHashTable::add(): duplicate ?! node %d 
route %d", conn->conn_info.motNodeId, conn->route);
+                       return true;            /* false *only* indicates 
memory-alloc
+                                                                * failure. */
+               }
+       }
+
+       struct ConnHtabBin *newbin = (struct ConnHtabBin *) 
ic_malloc(sizeof(struct ConnHtabBin));
+       if (newbin == NULL)
+               return false;
+
+       newbin->conn = conn;
+       newbin->next = this->table[hashcode];
+       this->table[hashcode] = newbin;
+
+       ic_statistics.activeConnectionsNum++;
+
+       return true;
+}
+
+/*
+ * remove
+ *             Delete a connection from the hash table
+ *
+ * Note: we want to remove a connection from the hashtable if it is
+ * there ... so we just have to check the pointer values -- no need to
+ * use CONN_HASH_MATCH() at all!
+ */
+void
+ConnHashTable::remove(UDPConn *conn)
+{
+       uint32          hashcode;
+       struct ConnHtabBin *c,
+                          *p,
+                          *trash;
+
+       hashcode = CONN_HASH_VALUE(&conn->conn_info) % this->size;
+
+       c = this->table[hashcode];
+
+       /* find entry */
+       p = NULL;
+       while (c != NULL)
+       {
+               /* found ? */
+               if (c->conn == conn)
+                       break;
+
+               p = c;
+               c = c->next;
+       }
+
+       /* not found ? */
+       if (c == NULL)
+       {
+               return;
+       }
+
+       /* found the connection, remove from the chain. */
+       trash = c;
+
+       if (p == NULL)
+               this->table[hashcode] = c->next;
+       else
+               p->next = c->next;
+
+       ic_free(trash);
+
+       ic_statistics.activeConnectionsNum--;
+
+       return;
+}
+
+/*
+ * findConnByHeader
+ *             Find the corresponding connection given a pkt header 
information.
+ *
+ * With the new mirroring scheme, the interconnect is no longer involved:
+ * we don't have to disambiguate anymore.
+ *
+ * NOTE: the icpkthdr field dstListenerPort is used for disambiguation.
+ * on receivers it may not match the actual port (it may have an extra bit
+ * set (1<<31)).
+ */
+UDPConn *
+ConnHashTable::find(icpkthdr *hdr) {
+
+       uint32 hashcode = CONN_HASH_VALUE(hdr) % this->size;
+       for (struct ConnHtabBin *bin = this->table[hashcode]; bin != NULL; bin 
= bin->next) {
+               UDPConn *conn = bin->conn;
+
+               if (CONN_HASH_MATCH(&conn->conn_info, hdr)) {
+                       UDPConn *ret = conn;
+                       if (IC_DEBUG5 >= session_param.log_min_messages)
+                               LOG(DEBUG5, "ConnHashTable::find: found. route 
%d state %d hashcode %d conn %p",
+                                       conn->route, ret->state, hashcode, ret);
+
+                       return ret;
+               }
+       }
+
+       if (IC_DEBUG5 >= session_param.log_min_messages)
+               LOG(DEBUG5, "ConnHashTable::find: not found! (hdr->srcPid %d 
hdr->srcContentId %d "
+                                       "hdr->dstContentId %d hdr->dstPid %d 
sess(%d:%d) cmd(%d:%d)) hashcode %d",
+                                       hdr->srcPid, hdr->srcContentId, 
hdr->dstContentId, hdr->dstPid, hdr->sessionId,
+                                       session_param.gp_session_id, hdr->icId, 
ic_control_info.ic_instance_id, hashcode);
+
+       return NULL;
+}
+
+void
+ConnHashTable::destroy() {
+       for (int i = 0; i < this->size; i++) {
+               while (this->table[i] != NULL) {
+                       struct ConnHtabBin *trash = this->table[i];
+                       this->table[i] = trash->next;
+                       ic_free(trash);
+               }
+       }
+
+       ic_free(this->table);
+       this->table = NULL;
+       this->size = 0;
+}
+
+/*
+ * icBufferListInitHeadLink
+ *             Initialize the pointers in the head link to point to itself.
+ */
+static inline void
+icBufferListInitHeadLink(ICBufferLink *link)
+{
+       link->next = link->prev = link;
+}
+
+
+#if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING)
+
+/*
+ * icBufferListLog
+ *             Log the buffer list.
+ */
+void
+ICBufferList::icBufferListLog()
+{
+       LOG(INFO, "Length %d, type %d headptr %p", this->len, this->type, 
&this->head);
+
+       ICBufferLink *bufLink = this->head.next;
+
+       int                     len = this->len;
+       int                     i = 0;
+
+       while (bufLink != &this->head && len > 0)
+       {
+               ICBuffer   *buf = (this->type == ICBufferListType_Primary ? 
GET_ICBUFFER_FROM_PRIMARY(bufLink)
+                                                  : 
GET_ICBUFFER_FROM_SECONDARY(bufLink));
+
+               LOG(INFO, "Node %d, linkptr %p", i++, bufLink);
+
+               logPkt("from list", buf->pkt);
+               bufLink = bufLink->next;
+               len--;
+       }
+}
+#endif
+
+#ifdef USE_ASSERT_CHECKING
+/*
+ * icBufferListCheck
+ *             Buffer list sanity check.
+ */
+void
+ICBufferList::icBufferListCheck(const char *prefix)
+{
+       int len = this->len;
+       ICBufferLink *link = this->head.next;
+
+       if (len < 0)
+       {
+               LOG(LOG_ERROR, "ICBufferList ERROR %s: list length %d < 0 ", 
prefix, this->length());
+               goto error;
+       }
+
+       if (len == 0 && (this->head.prev != this->head.next && this->head.prev 
!= &this->head))
+       {
+               LOG(LOG_ERROR, "ICBufferList ERROR %s: length is 0, &list->head 
%p, prev %p, next %p",
+                                          prefix, &this->head, 
this->head.prev, this->head.next);
+               this->icBufferListLog();
+               goto error;
+       }
+
+       while (len > 0)
+       {
+               link = link->next;
+               len--;
+       }
+
+       if (link != &this->head)
+       {
+               LOG(LOG_ERROR, "ICBufferList ERROR: %s len %d", prefix, 
this->len);
+               this->icBufferListLog();
+               goto error;
+       }
+
+       return;
+
+error:
+       LOG(INFO, "wait for 120s and then abort.");
+       ic_usleep(120000000);
+       abort();
+}
+#endif
+
+/*
+ * ICBufferList::init
+ *             Initialize the buffer list with the given type.
+ */
+void
+ICBufferList::init(ICBufferListType atype)
+{
+       Assert(atype == ICBufferListType_Primary|| atype == 
ICBufferListType_Secondary);
+
+       type = atype;
+       len  = 0;
+
+       icBufferListInitHeadLink(&head);
+
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::init");
+#endif
+}
+
+/*
+ * ICBufferList::is_head
+ *             Return whether the given link is the head link of the list.
+ *
+ *     This function is often used as the end condition of an iteration of the 
list.
+ */
+bool
+ICBufferList::is_head(ICBufferLink *link)
+{
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::is_head");
+#endif
+       return (link == &head);
+}
+
+/*
+ * ICBufferList::first
+ *             Return the first link after the head link.
+ *
+ *     Note that the head link is a pseudo link used to only to ease the 
operations of the link list.
+ *     If the list only contains the head link, this function will return the 
head link.
+ */
+ICBufferLink *
+ICBufferList::first()
+{
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::first");
+#endif
+       return head.next;
+}
+
+/*
+ * ICBufferList::length
+ *             Get the list length.
+ */
+int
+ICBufferList::length()
+{
+       return len;
+}
+
+/*
+ * ICBufferList::delete
+ *             Remove an buffer from the buffer list and return the buffer.
+ */
+ICBuffer *
+ICBufferList::remove(ICBuffer *buf)
+{
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::delete");
+#endif
+
+       ICBufferLink *bufLink = NULL;
+
+       bufLink = (this->type == ICBufferListType_Primary ? &buf->primary : 
&buf->secondary);
+
+       bufLink->prev->next = bufLink->next;
+       bufLink->next->prev = bufLink->prev;
+
+       len--;
+
+       return buf;
+}
+
+/*
+ * ICBufferList::pop
+ *             Remove the head buffer from the list.
+ */
+ICBuffer *
+ICBufferList::pop()
+{
+       ICBuffer   *buf = NULL;
+       ICBufferLink *bufLink = NULL;
+
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::pop");
+#endif
+
+       if (this->len == 0)
+               return NULL;
+
+       bufLink = this->first();
+       buf = (this->type == ICBufferListType_Primary ? 
GET_ICBUFFER_FROM_PRIMARY(bufLink)
+                  : GET_ICBUFFER_FROM_SECONDARY(bufLink));
+
+       bufLink->prev->next = bufLink->next;
+       bufLink->next->prev = bufLink->prev;
+
+       this->len--;
+
+       return buf;
+}
+
+/*
+ * ICBufferList::free
+ *             Free all the buffers in the list.
+ */
+void
+ICBufferList::destroy()
+{
+       ICBuffer   *buf = NULL;
+
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::free");
+#endif
+
+       while ((buf = this->pop()) != NULL)
+               ic_free(buf);
+}
+
+/*
+ * ICBufferList::append
+ *             Append a buffer to the tail of a double-link list.
+ */
+ICBuffer *
+ICBufferList::append(ICBuffer *buf)
+{
+       Assert(buf);
+
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::append");
+#endif
+
+       ICBufferLink *bufLink = NULL;
+
+       bufLink = (this->type == ICBufferListType_Primary ? &buf->primary : 
&buf->secondary);
+
+       bufLink->prev = this->head.prev;
+       bufLink->next = &this->head;
+
+       this->head.prev->next = bufLink;
+       this->head.prev = bufLink;
+
+       this->len++;
+
+       return buf;
+}
+
+/*
+ * ICBufferList::return
+ *             Return the buffers in the list to the free buffer list.
+ *
+ * If the buf is also in an expiration queue, we also need to remove it from 
the expiration queue.
+ *
+ */
+void
+ICBufferList::release(bool inExpirationQueue)
+{
+#ifdef USE_ASSERT_CHECKING
+       this->icBufferListCheck("ICBufferList::return");
+#endif
+       ICBuffer   *buf = NULL;
+
+       while ((buf = this->pop()) != NULL)
+       {
+               if (inExpirationQueue)  /* the buf is in also in the expiration 
queue */
+               {
+                       ICBufferList *alist = 
&unack_queue_ring.slots[buf->unackQueueRingSlot];
+                       buf = alist->remove(buf);
+                       unack_queue_ring.numOutStanding--;
+                       if (this->length() >= 1)
+                               unack_queue_ring.numSharedOutStanding--;
+               }
+
+               snd_buffer_pool.freeList.append(buf);
+       }
+}
+
+#ifdef USE_ASSERT_CHECKING
+/*
+ * ICBufferList::dump_to_file
+ *             Dump a buffer list.
+ */
+void
+ICBufferList::dump_to_file(FILE *ofile)
+{
+       this->icBufferListCheck("ICBufferList::dump_to_file");
+
+       ICBufferLink *bufLink = this->head.next;
+
+       int                     len = this->len;
+       int                     i = 0;
+
+       fprintf(ofile, "List Length %d\n", len);
+       while (bufLink != &this->head && len > 0)
+       {
+               ICBuffer   *buf = (this->type == ICBufferListType_Primary ? 
GET_ICBUFFER_FROM_PRIMARY(bufLink)
+                                                  : 
GET_ICBUFFER_FROM_SECONDARY(bufLink));
+
+               fprintf(ofile, "Node %d, linkptr %p ", i++, bufLink);
+               fprintf(ofile, "Packet Content [%s: seq %d extraSeq %d]: 
motNodeId %d, crc %d len %d "
+                               "srcContentId %d dstDesContentId %d "
+                               "srcPid %d dstPid %d "
+                               "srcListenerPort %d dstListernerPort %d "
+                               "sendSliceIndex %d recvSliceIndex %d "
+                               "sessionId %d icId %d "
+                               "flags %d\n",
+                               buf->pkt->flags & 
UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
+                               buf->pkt->seq, buf->pkt->extraSeq, 
buf->pkt->motNodeId, buf->pkt->crc, buf->pkt->len,
+                               buf->pkt->srcContentId, buf->pkt->dstContentId,
+                               buf->pkt->srcPid, buf->pkt->dstPid,
+                               buf->pkt->srcListenerPort, 
buf->pkt->dstListenerPort,
+                               buf->pkt->sendSliceIndex, 
buf->pkt->recvSliceIndex,
+                               buf->pkt->sessionId, buf->pkt->icId,
+                               buf->pkt->flags);
+               bufLink = bufLink->next;
+               len--;
+       }
+}
+#endif
+
+/*
+ * initUnackQueueRing
+ *             Initialize an unack queue ring.
+ *
+ *     Align current time to a slot boundary and set current slot index (time 
pointer) to 0.
+ */
+static void
+initUnackQueueRing(UnackQueueRing *uqr)
+{
+       int                     i = 0;
+
+       uqr->currentTime = 0;
+       uqr->idx = 0;
+       uqr->numOutStanding = 0;
+       uqr->numSharedOutStanding = 0;
+
+       for (; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
+       {
+               uqr->slots[i].init(ICBufferListType_Secondary);
+       }
+}
+
+/*
+ * RxBufferPool::get
+ *             Get a receive buffer.
+ *
+ * SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
+ *
+ * NOTE: This function MUST NOT contain elog or ereport statements.
+ * elog is NOT thread-safe.  Developers should instead use something like:
+ *
+ * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
+ */
+icpkthdr *
+RxBufferPool::get()
+{
+       icpkthdr   *ret = NULL;
+
+#ifdef USE_ASSERT_CHECKING
+       if (FINC_HAS_FAULT(FINC_RX_BUF_NULL) &&
+               
testmode_inject_fault(session_param.gp_udpic_fault_inject_percent))
+               return NULL;
+#endif
+
+       do
+       {
+               if (this->freeList == NULL)
+               {
+                       if (this->count > this->maxCount)
+                       {
+                               if (IC_DEBUG3 >= session_param.log_min_messages)
+                                       LOG(DEBUG3, "Interconnect ran out of 
rx-buffers count/max %d/%d", this->count, this->maxCount);
+                               break;
+                       }
+
+                       /* malloc is used for thread safty. */
+                       ret = (icpkthdr *) 
ic_malloc(global_param.Gp_max_packet_size);
+
+                       /*
+                        * Note: we return NULL if the malloc() fails -- and the
+                        * background thread will set the error. Main thread 
will check
+                        * the error, report it and start teardown.
+                        */
+                       if (ret != NULL)
+                               this->count++;
+
+                       break;
+               }
+
+               /* we have buffers available in our freelist */
+               ret = this->get_free();
+
+       } while (0);
+
+       return ret;
+}
+
+/*
+ * RxBufferPool::put
+ *             Return a receive buffer to free list
+ *
+ *  SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
+ */
+void
+RxBufferPool::put(icpkthdr *buf)
+{
+       /* return the buffer into the free list. */
+       *(char **) buf = this->freeList;
+       this->freeList = (char *)buf;
+}
+
+/*
+ * RxBufferPool::get_free
+ *             Get a receive buffer from free list
+ *
+ * SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
+ *
+ * NOTE: This function MUST NOT contain elog or ereport statements.
+ * elog is NOT thread-safe.  Developers should instead use something like:
+ *
+ * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
+ */
+icpkthdr *
+RxBufferPool::get_free()
+{
+       icpkthdr   *buf = NULL;
+
+       buf = (icpkthdr *) this->freeList;
+       this->freeList = *(char **) (this->freeList);
+       return buf;
+}
+
+/*
+ * RxBufferPool::free
+ *             Free a receive buffer.
+ *
+ * NOTE: This function MUST NOT contain elog or ereport statements.
+ * elog is NOT thread-safe.  Developers should instead use something like:
+ *
+ * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
+ */
+void
+RxBufferPool::release(icpkthdr *buf)
+{
+       ic_free(buf);
+       count--;
+}
+
+/*
+ * init
+ *             Initialize the send buffer pool.
+ *
+ * The initial maxCount is set to 1 for gp_interconnect_snd_queue_depth = 1 
case,
+ * then there is at least an extra free buffer to send for that case.
+ */
+void
+SendBufferPool::init()
+{
+       this->freeList.init(ICBufferListType_Primary);
+       this->count = 0;
+       this->maxCount = (session_param.Gp_interconnect_snd_queue_depth == 1 ? 
1 : 0);
+}
+
+/*
+ * clean
+ *             Clean the send buffer pool.
+ */
+void
+SendBufferPool::clean()
+{
+       this->freeList.destroy();
+       this->count = 0;
+       this->maxCount = 0;
+}
+
+/*
+ * get
+ *             Get a send buffer for a connection.
+ *
+ *  Different flow control mechanisms use different buffer management policies.
+ *  Capacity based flow control uses per-connection buffer policy and Loss 
based
+ *  flow control uses shared buffer policy.
+ *
+ *     Return NULL when no free buffer available.
+ */
+ICBuffer *
+SendBufferPool::get(UDPConn *conn)
+{
+       ICBuffer   *ret = NULL;
+
+       ic_statistics.totalBuffers += (this->freeList.length() + this->maxCount 
- this->count);
+       ic_statistics.bufferCountingTime++;
+
+       /* Capacity based flow control does not use shared buffers */
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_CAPACITY_IC)
+       {
+               Assert(conn->unackQueue.length() + conn->sndQueue.length() <= 
session_param.Gp_interconnect_snd_queue_depth);
+               if (conn->unackQueue.length() + conn->sndQueue.length() >= 
session_param.Gp_interconnect_snd_queue_depth)
+                       return NULL;
+       }
+
+       if (this->freeList.length() > 0)
+       {
+               return this->freeList.pop();
+       }
+       else
+       {
+               if (this->count < this->maxCount)
+               {
+                       ret = (ICBuffer *) 
ic_malloc0(global_param.Gp_max_packet_size + sizeof(ICBuffer));
+                       this->count++;
+                       ret->conn = NULL;
+                       ret->nRetry = 0;
+                       icBufferListInitHeadLink(&ret->primary);
+                       icBufferListInitHeadLink(&ret->secondary);
+                       ret->unackQueueRingSlot = 0;
+               }
+               else
+               {
+                       return NULL;
+               }
+       }
+
+       return ret;
+}
+
+/*
+ * addCRC
+ *             add CRC field to the packet.
+ */
+static void
+addCRC(icpkthdr *pkt)
+{
+       pkt->crc = ComputeCRC(pkt, pkt->len);
+}
+
+/*
+ * checkCRC
+ *             check the validity of the packet.
+ */
+static bool
+checkCRC(icpkthdr *pkt)
+{
+       uint32          rx_crc,
+                               local_crc;
+
+       rx_crc = pkt->crc;
+       pkt->crc = 0;
+       local_crc = ComputeCRC(pkt, pkt->len);
+       if (rx_crc != local_crc)
+       {
+               return false;
+       }
+
+       return true;
+}
+
+
+/*
+ * checkRxThreadError
+ *             Check whether there was error in the background thread in main 
thread.
+ *
+ *     If error found, report it.
+ */
+static void
+checkRxThreadError()
+{
+       int eno;
+
+       eno = ic_atomic_read_u32(&ic_control_info.eno);
+       if (eno != 0)
+       {
+               errno = eno;
+
+               std::stringstream ss;
+               ss <<"ERROR, interconnect encountered an error, in receive 
background thread: "<<strerror(eno);
+               throw ICReceiveThreadException(ss.str(), __FILE__, __LINE__);
+       }
+}
+
+/*
+ * setRxThreadError
+ *             Set the error no in background thread.
+ *
+ *     Record the error in background thread. Main thread checks the errors 
periodically.
+ *     If main thread will find it, main thread will handle it.
+ */
+static void
+setRxThreadError(int eno)
+{
+       uint32 expected = 0;
+
+       /* always let main thread know the error that occurred first. */
+       if (ic_atomic_compare_exchange_u32(&ic_control_info.eno, &expected, 
(uint32) eno))
+       {
+               LOG(LOG_ERROR, "Interconnect error: in background thread, set 
ic_control_info.eno to %d, "
+                                          "rx_buffer_pool.count %d, 
rx_buffer_pool.maxCount %d",
+                                          expected, rx_buffer_pool.count, 
rx_buffer_pool.maxCount);
+       }
+}
+
+/*
+ * resetRxThreadError
+ *             Reset the error no.
+ *
+ */
+static void
+resetRxThreadError()
+{
+       ic_atomic_write_u32(&ic_control_info.eno, 0);
+}
+
+/*
+  * Set UDP IC send/receive socket buffer size.
+  *
+  * We must carefully size the UDP IC socket's send/receive buffers. If the 
size
+  * is too small, say 128K, and send queue depth and receive queue depth are
+  * large, then there might be a lot of dropped/reordered packets. We start
+  * trying from a size of 2MB (unless Gp_udp_bufsize_k is specified), and
+  * gradually back off to UDPIC_MIN_BUF_SIZE. For a given size setting to be
+  * successful, the corresponding UDP kernel buffer size params must be 
adequate.
+  *
+ */
+static uint32
+setUDPSocketBufferSize(int ic_socket, int buffer_type)
+{
+    int                 expected_size;
+    int                 curr_size;
+    socklen_t           option_len = 0;
+
+    Assert(buffer_type == SO_SNDBUF || buffer_type == SO_RCVBUF);
+
+       expected_size = (global_param.Gp_udp_bufsize_k ? 
global_param.Gp_udp_bufsize_k * 1024 : 2048 * 1024);
+
+    curr_size = expected_size;
+    option_len = sizeof(curr_size);
+    while (setsockopt(ic_socket, SOL_SOCKET, buffer_type, (const char *) 
&curr_size, option_len) < 0)
+       {
+               if(session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                       LOG(DEBUG3, "UDP-IC: setsockopt %s failed to set buffer 
size = %d bytes: %m",
+                          buffer_type == SO_SNDBUF ? "send": "receive",
+                          curr_size);
+
+        curr_size = curr_size >> 1;
+        if (curr_size < UDPIC_MIN_BUF_SIZE)
+            return -1;
+       }
+
+       if(session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+       LOG(DEBUG3, "UDP-IC: socket %s current buffer size = %d bytes",
+                      buffer_type == SO_SNDBUF ? "send": "receive",
+                      curr_size);
+
+    return curr_size;
+}
+
+
+/*
+ * setupUDPListeningSocket
+ *             Setup udp listening socket.
+ */
+static void
+setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int 
*txFamily, struct sockaddr_storage *listenerSockaddr)
+{
+    struct addrinfo         *addrs = NULL;
+    struct addrinfo         *addr;
+    struct addrinfo         hints;
+    int                     ret;
+    int                     ic_socket = INVALID_SOCKET;
+    struct sockaddr_storage ic_socket_addr;
+    int                     tries = 0;
+    struct sockaddr_storage listenerAddr;
+    socklen_t               listenerAddrlen = sizeof(ic_socket_addr);
+    uint32                  socketSendBufferSize;
+    uint32                  socketRecvBufferSize;
+
+       memset(&hints, 0, sizeof(struct addrinfo));
+       hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
+       hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+       hints.ai_protocol = 0;
+    hints.ai_addrlen = 0;
+    hints.ai_addr = NULL;
+    hints.ai_canonname = NULL;
+    hints.ai_next = NULL;
+    hints.ai_flags |= AI_NUMERICHOST;
+
+#ifdef USE_ASSERT_CHECKING
+       if (session_param.gp_udpic_network_disable_ipv6)
+               hints.ai_family = AF_INET;
+#endif
+
+       if (global_param.Gp_interconnect_address_type == 
INTERCONNECT_ADDRESS_TYPE_UNICAST_IC)
+       {
+               Assert(global_param.interconnect_address && 
strlen(global_param.interconnect_address) > 0);
+               hints.ai_flags |= AI_NUMERICHOST;
+               if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                       LOG(DEBUG3, "getaddrinfo called with unicast address: 
%s", global_param.interconnect_address);
+       }
+       else
+       {
+               Assert(global_param.interconnect_address == NULL);
+               hints.ai_flags |= AI_PASSIVE;
+               if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                       LOG(DEBUG3, "getaddrinfo called with wildcard address");
+       }
+
+       /*
+     * Restrict what IP address we will listen on to just the one that was
+     * used to create this QE session.
+        */
+    Assert(global_param.interconnect_address && 
strlen(global_param.interconnect_address) > 0);
+    ret = getaddrinfo((!global_param.interconnect_address || 
global_param.interconnect_address[0] == '\0') ? NULL : 
global_param.interconnect_address,
+                       NULL, &hints, &addrs);
+    if (ret || !addrs)
+    {
+        LOG(INFO, "could not resolve address for UDP IC socket %s: %s",
+                        global_param.interconnect_address,
+                        gai_strerror(ret));
+        goto startup_failed;
+    }
+
+       /*
+     * On some platforms, pg_getaddrinfo_all() may return multiple addresses
+     * only one of which will actually work (eg, both IPv6 and IPv4 addresses
+     * when kernel will reject IPv6).  Worse, the failure may occur at the
+     * bind() or perhaps even connect() stage.  So we must loop through the
+     * results till we find a working combination. We will generate DEBUG
+     * messages, but no error, for bogus combinations.
+        */
+       for (addr = addrs; addr != NULL; addr = addr->ai_next)
+       {
+#ifdef HAVE_UNIX_SOCKETS
+        /* Ignore AF_UNIX sockets, if any are returned. */
+        if (addr->ai_family == AF_UNIX)
+            continue;
+#endif
+
+               if (++tries > 1 && session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                       LOG(DEBUG3, "trying another address for UDP 
interconnect socket");
+
+        ic_socket = socket(addr->ai_family, addr->ai_socktype, 
addr->ai_protocol);
+        if (ic_socket == INVALID_SOCKET)
+               {
+                       if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                               LOG(DEBUG3, "could not create UDP interconnect 
socket: %m");
+                       continue;
+               }
+
+        /*
+         * Bind the socket to a kernel assigned ephemeral port on the
+         * interconnect_address.
+         */
+        if (bind(ic_socket, addr->ai_addr, addr->ai_addrlen) < 0)
+               {
+                       if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                               LOG(DEBUG3, "could not bind UDP interconnect 
socket: %m");
+            closesocket(ic_socket);
+            ic_socket = INVALID_SOCKET;
+            continue;
+               }
+
+        /* Call getsockname() to eventually obtain the assigned ephemeral port 
*/
+        if (getsockname(ic_socket, (struct sockaddr *) &listenerAddr, 
&listenerAddrlen) < 0)
+               {
+                       if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                               LOG(DEBUG3, "could not get address of UDP 
interconnect socket: %m");
+            closesocket(ic_socket);
+            ic_socket = INVALID_SOCKET;
+            continue;
+               }
+
+       /* If we get here, we have a working socket */
+       break;
+       }
+
+    if (!addr || ic_socket == INVALID_SOCKET)
+        goto startup_failed;
+
+    /* Memorize the socket fd, kernel assigned port and address family */
+    *listenerSocketFd = ic_socket;
+    if (listenerAddr.ss_family == AF_INET6)
+    {
+        *listenerPort = ntohs(((struct sockaddr_in6 *) 
&listenerAddr)->sin6_port);
+        *txFamily = AF_INET6;
+    }
+    else
+    {
+        *listenerPort = ntohs(((struct sockaddr_in *) 
&listenerAddr)->sin_port);
+        *txFamily = AF_INET;
+    }
+
+    /*
+     * cache the successful sockaddr of the listening socket, so
+     * we can use this information to connect to the listening socket.
+     */
+    if (listenerSockaddr != NULL)
+        memcpy(listenerSockaddr, &listenerAddr, sizeof(struct 
sockaddr_storage));
+
+    /* Set up socket non-blocking mode */
+    if (!ic_set_noblock(ic_socket))
+    {
+        LOG(INFO, "could not set UDP interconnect socket to nonblocking mode: 
%s", strerror(errno));
+        goto startup_failed;
+    }
+
+    /* Set up the socket's send and receive buffer sizes. */
+    socketRecvBufferSize = setUDPSocketBufferSize(ic_socket, SO_RCVBUF);
+    if (socketRecvBufferSize == static_cast<uint32>(-1))
+        goto startup_failed;
+    ic_control_info.socketRecvBufferSize = socketRecvBufferSize;
+
+    socketSendBufferSize = setUDPSocketBufferSize(ic_socket, SO_SNDBUF);
+    if (socketSendBufferSize == static_cast<uint32>(-1))
+        goto startup_failed;
+    ic_control_info.socketSendBufferSize = socketSendBufferSize;
+
+       if (addrs != NULL)
+               freeaddrinfo(addrs);
+       return;
+
+startup_failed:
+       if (addrs)
+               freeaddrinfo(addrs);
+       if (ic_socket != INVALID_SOCKET)
+       {
+               closesocket(ic_socket);
+       }
+
+       std::stringstream ss;
+       ss << "ERROR,interconnect error: Could not set up udp listener socket: 
" << strerror(errno);
+       throw ICNetworkException(ss.str(), __FILE__, __LINE__);
+}
+
+/*
+ * InitMutex
+ *             Initialize mutex.
+ */
+static void
+initMutex(pthread_mutex_t *mutex)
+{
+       pthread_mutexattr_t m_atts;
+
+       pthread_mutexattr_init(&m_atts);
+       pthread_mutexattr_settype(&m_atts, PTHREAD_MUTEX_ERRORCHECK);
+
+       pthread_mutex_init(mutex, &m_atts);
+}
+
+/*
+ * Set up the udp interconnect pthread signal mask, we don't want to run our 
signal handlers
+ */
+static void
+ic_set_pthread_sigmasks(sigset_t *old_sigs)
+{
+#ifndef WIN32
+       sigset_t sigs;
+       int              err;
+
+       sigfillset(&sigs);
+
+       err = pthread_sigmask(SIG_BLOCK, &sigs, old_sigs);
+       if (err != 0)
+       {
+               std::stringstream ss;
+               ss << "ERROR: Failed to get pthread signal masks with return 
value: "<<err;
+               throw ICReceiveThreadException(ss.str(), __FILE__, __LINE__);
+       }
+#else
+       (void) old_sigs;
+#endif
+}
+
+static void
+ic_reset_pthread_sigmasks(sigset_t *sigs)
+{
+#ifndef WIN32
+       int err;
+
+       err = pthread_sigmask(SIG_SETMASK, sigs, NULL);
+       if (err != 0)
+       {
+               std::stringstream ss;
+               ss <<"ERROR: Failed to reset pthread signal masks with return 
value: "<<err;
+               throw ICReceiveThreadException(ss.str(), __FILE__, __LINE__);
+       }
+#else
+       (void) sigs;
+#endif
+}
+
+/*
+ * InitMotionUDPIFC
+ *             Initialize UDP specific comms, and create rx-thread.
+ */
+static inline void
+InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
+{
+       int                     pthread_err;
+       int                     txFamily = -1;
+
+       /* attributes of the thread we're creating */
+       pthread_attr_t t_atts;
+       sigset_t           pthread_sigs;
+
+#ifdef USE_ASSERT_CHECKING
+       set_test_mode();
+#endif
+
+       /* Initialize global ic control data. */
+       ic_atomic_init_u32(&ic_control_info.eno, 0);
+       ic_control_info.isSender = false;
+       ic_control_info.socketSendBufferSize = 2 * 1024 * 1024;
+       ic_control_info.socketRecvBufferSize = 2 * 1024 * 1024;
+       initMutex(&ic_control_info.lock);
+       ic_atomic_init_u32(&ic_control_info.shutdown, 0);
+       ic_control_info.threadCreated = false;
+       ic_control_info.ic_instance_id = 0;
+
+       ic_control_info.connHtab.init();
+       if (!ic_control_info.startupCacheHtab.init())
+       {
+               throw ICFatalException("FATAL, failed to initialize connection 
htab for startup cache", __FILE__, __LINE__);
+       }
+
+       /*
+        * setup listening socket and sending socket for Interconnect.
+        */
+    setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, 
&udp_dummy_packet_sockaddr);
+    setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, 
NULL);
+
+       /* Initialize receive control data. */
+       rx_control_info.mainWaitingState.reset();
+
+       /* allocate a buffer for sending disorder messages */
+       rx_control_info.disorderBuffer = (icpkthdr 
*)ic_malloc0(MIN_PACKET_SIZE);
+       rx_control_info.lastDXatId = InvalidTransactionId;
+       rx_control_info.lastTornIcId = 0;
+       rx_control_info.cursorHistoryTable.init();
+
+       /* Initialize receive buffer pool */
+       rx_buffer_pool.count = 0;
+       rx_buffer_pool.maxCount = 1;
+       rx_buffer_pool.freeList = NULL;
+
+       /* Initialize send control data */
+       snd_control_info.cwnd = 0;
+       snd_control_info.minCwnd = 0;
+       snd_control_info.ackBuffer = (icpkthdr *)ic_malloc0(MIN_PACKET_SIZE);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+       initMutex(&trans_proto_stats.lock);
+#endif
+
+       /* Start up our rx-thread */
+
+       /*
+        * save ourselves some memory: the defaults for thread stack size are
+        * large (1M+)
+        */
+       pthread_attr_init(&t_atts);
+
+       pthread_attr_setstacksize(&t_atts, Max(PTHREAD_STACK_MIN, (128 * 
1024)));
+       ic_set_pthread_sigmasks(&pthread_sigs);
+       pthread_err = pthread_create(&ic_control_info.threadHandle, &t_atts, 
rxThreadFunc, NULL);
+       ic_reset_pthread_sigmasks(&pthread_sigs);
+
+       pthread_attr_destroy(&t_atts);
+       if (pthread_err != 0)
+       {
+               ic_control_info.threadCreated = false;
+               std::stringstream ss;
+               ss<<"FATAL: pthread_create() failed with err "<<pthread_err;
+               throw ICFatalException(ss.str(), __FILE__, __LINE__);
+       }
+
+       ic_control_info.threadCreated = true;
+}
+
+/*
+ * CleanupMotionUDPIFC
+ *             Clean up UDP specific stuff such as cursor ic hash table, 
thread etc.
+ */
+static inline void
+CleanupMotionUDPIFC(void)
+{
+       LOG(DEBUG2, "udp-ic: telling receiver thread to shutdown.");
+
+       /*
+        * We should not hold any lock when we reach here even when we report
+        * FATAL errors. Just in case, We still release the locks here.
+        */
+       pthread_mutex_unlock(&ic_control_info.lock);
+
+       /* Shutdown rx thread. */
+       ic_atomic_write_u32(&ic_control_info.shutdown, 1);
+
+       if (ic_control_info.threadCreated)
+               pthread_join(ic_control_info.threadHandle, NULL);
+
+       LOG(DEBUG2, "udp-ic: receiver thread shutdown.");
+
+       rx_control_info.cursorHistoryTable.purge();
+
+       ic_control_info.connHtab.destroy();
+
+       /* background thread exited, we can do the cleanup without locking. */
+       cleanupStartupCache();
+       ic_control_info.startupCacheHtab.destroy();
+
+       /* free the disorder buffer */
+       ic_free(rx_control_info.disorderBuffer);
+       rx_control_info.disorderBuffer = NULL;
+
+       /* free the buffer for acks */
+       ic_free(snd_control_info.ackBuffer);
+       snd_control_info.ackBuffer = NULL;
+
+       if (ICSenderSocket >= 0)
+               closesocket(ICSenderSocket);
+       ICSenderSocket = -1;
+       ICSenderPort = 0;
+       ICSenderFamily = 0;
+
+    memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr));
+
+#ifdef USE_ASSERT_CHECKING
+
+       /*
+        * Check malloc times, in Interconnect part, memory are carefully 
released
+        * in tear down code (even when error occurred). But if a FATAL error is
+        * reported, tear down code will not be executed. Thus, it is still
+        * possible the malloc times and free times do not match when we reach
+        * here. The process will die in this case, the mismatch does not
+        * introduce issues.
+        */
+       if (icudp_malloc_times != 0)
+               LOG(INFO, "WARNING: malloc times and free times do not match. 
remain alloc times: %ld", icudp_malloc_times);
+#endif
+}
+
+/*
+ * getSockAddr
+ *             Convert IP addr and port to sockaddr
+ */
+static void
+getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char 
*listenerAddr, int listenerPort)
+{
+       int                     ret;
+       char            portNumberStr[32];
+       char       *service;
+       struct addrinfo *addrs = NULL;
+       struct addrinfo hint;
+
+       /*
+        * Get socketaddr to connect to.
+        */
+
+       /* Initialize hint structure */
+       memset(&hint, 0, sizeof(hint));
+       hint.ai_socktype = SOCK_DGRAM;  /* UDP */
+       hint.ai_family = AF_UNSPEC; /* Allow for any family (v4, v6, even unix 
in
+                                                                * the future)  
*/
+#ifdef AI_NUMERICSERV
+       hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;        /* Never do name
+                                                                               
                                 * resolution */
+#else
+       hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */
+#endif
+
+       snprintf(portNumberStr, sizeof(portNumberStr), "%d", listenerPort);
+       service = portNumberStr;
+
+       addrs = NULL;
+       /* NULL has special meaning to getaddrinfo(). */
+       ret = getaddrinfo((!listenerAddr || listenerAddr[0] == '\0') ? NULL : 
listenerAddr,
+                                        service, &hint, &addrs);
+       if (ret || !addrs)
+       {
+               if (addrs)
+                       freeaddrinfo(addrs);
+
+               std::stringstream ss;
+               ss<<"ERROR, interconnect error: Could not parse remote listener 
address: '"<<listenerAddr<<
+                       "' port '"<<listenerPort<<"': "<<gai_strerror(ret)<<" 
getaddrinfo() unable to parse address: '"<<listenerAddr<<"'";
+               throw ICNetworkException(ss.str(), __FILE__, __LINE__);
+       }
+
+       /*
+        * Since we aren't using name resolution, getaddrinfo will return only 1
+        * entry
+        */
+
+       LOG(DEBUG1, "GetSockAddr socket ai_family %d ai_socktype %d ai_protocol 
%d for %s ",
+                               addrs->ai_family, addrs->ai_socktype, 
addrs->ai_protocol, listenerAddr);
+       memset(peer, 0, sizeof(struct sockaddr_storage));
+       memcpy(peer, addrs->ai_addr, addrs->ai_addrlen);
+       *peer_len = addrs->ai_addrlen;
+
+       if (addrs)
+               freeaddrinfo(addrs);
+}
+
+/*
+ * format_sockaddr
+ *                     Format a sockaddr to a human readable string
+ *
+ * This function must be kept threadsafe, elog/ereport/palloc etc are not
+ * allowed within this function.
+ */
+static char *
+format_sockaddr_udp(struct sockaddr_storage *sa, char *buf, size_t len)
+{
+       int                     ret;
+       char            remote_host[NI_MAXHOST];
+       char            remote_port[NI_MAXSERV];
+
+       ret = getnameinfo((const struct sockaddr *)sa, sizeof(struct 
sockaddr_storage),
+                                                        remote_host, 
sizeof(remote_host),
+                                                        remote_port, 
sizeof(remote_port),
+                                                        NI_NUMERICHOST | 
NI_NUMERICSERV);
+       if (ret != 0)
+       {
+               strncpy(remote_host, "???", sizeof(remote_host));
+               strncpy(remote_port, "???", sizeof(remote_port));
+       }
+
+       if (ret != 0)
+               snprintf(buf, len, "?host?:?port?");
+       else
+       {
+#ifdef HAVE_IPV6
+               if (sa->ss_family == AF_INET6)
+                       snprintf(buf, len, "[%s]:%s", remote_host, remote_port);
+               else
+#endif
+                       snprintf(buf, len, "%s:%s", remote_host, remote_port);
+       }
+
+       return buf;
+}
+
+/*
+ * setupOutgoingUDPConnection
+ *             Setup outgoing UDP connection.
+ */
+static void
+setupOutgoingUDPConnection(int icid, TransportEntry *pEntry, UDPConn *conn)
+{
+       ICCdbProcess *cdbProc = NULL;
+
+       Assert(pEntry);
+
+       cdbProc = conn->cdbProc;
+       Assert(conn->state == mcsSetupOutgoingConnection);
+       Assert(conn->cdbProc);
+
+       conn->remoteContentId = cdbProc->contentid;
+       conn->stat_min_ack_time = ~((uint64) 0);
+
+       /* Save the information for the error message if getaddrinfo fails */
+       if (strchr(cdbProc->listenerAddr, ':') != 0)
+               snprintf(conn->remoteHostAndPort, 
sizeof(conn->remoteHostAndPort),
+                                "[%s]:%d", cdbProc->listenerAddr, 
cdbProc->listenerPort);
+       else
+               snprintf(conn->remoteHostAndPort, 
sizeof(conn->remoteHostAndPort),
+                                "%s:%d", cdbProc->listenerAddr, 
cdbProc->listenerPort);
+
+       /*
+        * Get socketaddr to connect to.
+        */
+       getSockAddr(&conn->peer, &conn->peer_len, cdbProc->listenerAddr, 
cdbProc->listenerPort);
+
+       /* Save the destination IP address */
+       format_sockaddr_udp(&conn->peer, conn->remoteHostAndPort,
+                                  sizeof(conn->remoteHostAndPort));
+
+       Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == 
AF_INET6);
+
+       {
+#ifdef USE_ASSERT_CHECKING
+               {
+                       struct sockaddr_storage source_addr;
+                       socklen_t       source_addr_len;
+
+                       memset(&source_addr, 0, sizeof(source_addr));
+                       source_addr_len = sizeof(source_addr);
+
+                       if (getsockname(pEntry->txfd, (struct sockaddr *) 
&source_addr, &source_addr_len) == -1)
+                       {
+                               throw ICNetworkException(std::string("ERROR, 
interconnect Error: Could not get port from socket, %m")+strerror(errno), 
__FILE__, __LINE__);
+                       }
+                       Assert(pEntry->txfd_family == source_addr.ss_family);
+               }
+#endif
+
+               /*
+                * If the socket was created with a different address family 
than the
+                * place we are sending to, we might need to do something 
special.
+                */
+               if (pEntry->txfd_family != conn->peer.ss_family)
+               {
+                       /*
+                        * If the socket was created AF_INET6, but the address 
we want to
+                        * send to is IPv4 (AF_INET), we might need to change 
the address
+                        * format.  On Linux, it isn't necessary:  glibc 
automatically
+                        * handles this.  But on MAC OSX and Solaris, we need 
to convert
+                        * the IPv4 address to an V4-MAPPED address in AF_INET6 
format.
+                        */
+                       if (pEntry->txfd_family == AF_INET6)
+                       {
+                               LOG(DEBUG1, "We are inet6, remote is inet.  
Converting to v4 mapped address.");
+                               ConvertToIPv4MappedAddr(&conn->peer, 
&conn->peer_len);
+                       }
+                       else
+                       {
+                               /*
+                                * If we get here, something is really wrong.  
We created the
+                                * socket as IPv4-only (AF_INET), but the 
address we are
+                                * trying to send to is IPv6.  It's possible we 
could have a
+                                * V4-mapped address that we could convert to 
an IPv4 address,
+                                * but there is currently no code path where 
that could
+                                * happen.  So this must be an error.
+                                */
+                               throw ICNetworkException("ERROR: Trying to use 
an IPv4 (AF_INET) socket to send to an IPv6 address", __FILE__, __LINE__);
+                       }
+               }
+       }
+
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+               LOG(DEBUG1, "Interconnect connecting to seg%d slice%d %s pid=%d 
sockfd=%d",
+                                       conn->remoteContentId, 
pEntry->recvSlice->sliceIndex, conn->remoteHostAndPort, conn->cdbProc->pid, 
conn->sockfd);
+
+       /* send connection request */
+       memset(&conn->conn_info, 0, sizeof(conn->conn_info));
+       conn->conn_info.len = 0;
+       conn->conn_info.flags = 0;
+       conn->conn_info.motNodeId = pEntry->motNodeId;
+
+       conn->conn_info.recvSliceIndex = pEntry->recvSlice->sliceIndex;
+       conn->conn_info.sendSliceIndex = pEntry->sendSlice->sliceIndex;
+       conn->conn_info.srcContentId = global_param.segindex;
+       conn->conn_info.dstContentId = conn->cdbProc->contentid;
+
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+               LOG(DEBUG1, "setupOutgoingUDPConnection: node %d route %d 
srccontent %d dstcontent %d: %s",
+                                       pEntry->motNodeId, conn->route, 
global_param.segindex, conn->cdbProc->contentid, conn->remoteHostAndPort);
+
+       conn->conn_info.srcListenerPort = UDP2_GetListenPortUDP();
+       conn->conn_info.srcPid = global_param.MyProcPid;
+       conn->conn_info.dstPid = conn->cdbProc->pid;
+       conn->conn_info.dstListenerPort = conn->cdbProc->listenerPort;
+
+       conn->conn_info.sessionId = session_param.gp_session_id;
+       conn->conn_info.icId = icid;
+
+       ic_control_info.connHtab.add(conn);
+
+       /*
+        * No need to get the connection lock here, since background rx thread
+        * will never access send connections.
+        */
+       conn->msgPos = NULL;
+       conn->msgSize = sizeof(conn->conn_info);
+       conn->stillActive = true;
+       conn->conn_info.seq = 1;
+       Assert(conn->peer.ss_family == AF_INET || conn->peer.ss_family == 
AF_INET6);
+}
+
+/*
+ * If the socket was created AF_INET6, but the address we want to
+ * send to is IPv4 (AF_INET), we need to change the address
+ * format. On Linux, this is not necessary: glibc automatically
+ * handles this. But on MAC OSX and Solaris, we need to convert
+ * the IPv4 address to IPv4-mapped IPv6 address in AF_INET6 format.
+ *
+ * The comment above relies on getaddrinfo() via function getSockAddr to get
+ * the correct V4-mapped address. We need to be careful here as we need to
+ * ensure that the platform we are using is POSIX 1003-2001 compliant.
+ * Just to be on the safeside, we'll be keeping this function for
+ * now to be used for all platforms and not rely on POSIX.
+ *
+ * Since this can be called in a signal handler, we avoid the use of
+ * async-signal unsafe functions such as memset/memcpy
+ */
+static void
+ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len)
+{
+       const struct sockaddr_in *in = (const struct sockaddr_in *)sockaddr;
+       struct sockaddr_storage temp = {0};
+       struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *)&temp;
+
+       /* Construct a IPv4-to-IPv6 mapped address.  */
+       temp.ss_family = AF_INET6;
+       in6_new->sin6_family = AF_INET6;
+       in6_new->sin6_port = in->sin_port;
+       in6_new->sin6_flowinfo = 0;
+
+       ((uint16 *)&in6_new->sin6_addr)[5] = 0xffff;
+
+       in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr;
+       in6_new->sin6_scope_id = 0;
+
+       /* copy it back */
+       *sockaddr = temp;
+       *o_len = sizeof(struct sockaddr_in6);
+}
+
+#if defined(__darwin__)
+/* macos does not accept :: as the destination, we will need to covert this to 
the IPv6 loopback */
+static void
+ConvertIPv6WildcardToLoopback(struct sockaddr_storage *dest)
+{
+       char address[INET6_ADDRSTRLEN];
+       /* we want to terminate our own process, so this should be local */
+       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 
*)&udp_dummy_packet_sockaddr;
+       inet_ntop(AF_INET6, &in6->sin6_addr, address, sizeof(address));
+       if (strcmp("::", address) == 0)
+               ((struct sockaddr_in6 *)dest)->sin6_addr = in6addr_loopback;
+}
+#endif
+
+/*
+ * handleCachedPackets
+ *             Deal with cached packets.
+ */
+static void
+handleCachedPackets(void)
+{
+       UDPConn *cachedConn = NULL;
+       UDPConn *setupConn = NULL;
+       ConnHtabBin *bin = NULL;
+       icpkthdr   *pkt = NULL;
+       AckSendParam param;
+       int                     i = 0;
+       uint32          j = 0;
+       bool            dummy;
+
+       for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
+       {
+               bin = ic_control_info.startupCacheHtab.table[i];
+
+               while (bin)
+               {
+                       cachedConn = bin->conn;
+                       setupConn = NULL;
+
+                       for (j = 0; j < cachedConn->pkt_q_size; j++)
+                       {
+                               pkt = (icpkthdr *) cachedConn->pkt_q[j];
+
+                               if (pkt == NULL)
+                                       continue;
+
+                               rx_buffer_pool.maxCount--;
+
+                               /* look up this pkt's connection in connHtab */
+                               setupConn = ic_control_info.connHtab.find(pkt);
+                               if (setupConn == NULL)
+                               {
+                                       /* mismatch! */
+                                       rx_buffer_pool.put(pkt);
+                                       cachedConn->pkt_q[j] = NULL;
+                                       continue;
+                               }
+
+                               memset(&param, 0, sizeof(param));
+                               if (!handleDataPacket(setupConn, pkt, 
&cachedConn->peer, &cachedConn->peer_len, &param, &dummy))
+                               {
+                                       /* no need to cache this packet */
+                                       rx_buffer_pool.put(pkt);
+                               }
+
+                               ic_statistics.recvPktNum++;
+                               if (param.msg.len != 0)
+                                       UDPConn::sendAckWithParam(&param);
+
+                               cachedConn->pkt_q[j] = NULL;
+                       }
+                       bin = bin->next;
+                       ic_control_info.startupCacheHtab.remove(cachedConn);
+
+                       /*
+                        * MPP-19981 free the cached connections; otherwise 
memory leak
+                        * would be introduced.
+                        */
+                       ic_free(cachedConn->pkt_q);
+                       delete cachedConn;
+               }
+       }
+}
+
+/*
+ * CChunkTransportStateImpl::setup
+ *             Internal function for setting up UDP interconnect.
+ */
+ICChunkTransportState*
+CChunkTransportStateImpl::setup(ICSliceTable *sliceTable)
+{
+       pthread_mutex_lock(&ic_control_info.lock);
+
+       Assert(sliceTable->ic_instance_id > 0);
+
+       if (global_param.Gp_role == GP_ROLE_DISPATCH_IC)
+       {
+               /*
+                * QD use cursorHistoryTable to handle mismatch packets, no
+                * need to update ic_control_info.ic_instance_id
+                */
+               Assert(session_param.gp_interconnect_id == 
sliceTable->ic_instance_id);
+       }
+       else
+       {
+               /*
+                * update ic_control_info.ic_instance_id, it is mainly used
+                * by rx thread to handle mismatch packets
+                */
+               ic_control_info.ic_instance_id = sliceTable->ic_instance_id;
+       }
+
+       CChunkTransportStateImpl *state_impl = new 
CChunkTransportStateImpl(sliceTable);
+       ICChunkTransportState *interconnect_context = 
static_cast<ICChunkTransportState*>(state_impl);
+       CChunkTransportStateImpl::state_ = 
static_cast<CChunkTransportState*>(state_impl);
+
+#ifdef USE_ASSERT_CHECKING
+       ICExecSlice *mySlice = 
&interconnect_context->sliceTable->slices[sliceTable->localSlice];
+       Assert(mySlice && mySlice->sliceIndex == sliceTable->localSlice);
+#endif
+
+#ifdef USE_ASSERT_CHECKING
+       set_test_mode();
+#endif
+
+       if (global_param.Gp_role == GP_ROLE_DISPATCH_IC)
+       {
+               CursorICHistoryTable *ich_table = 
&rx_control_info.cursorHistoryTable;
+               //DistributedTransactionId distTransId = 
getDistributedTransactionId(); TODO: add callback;
+               DistributedTransactionId distTransId = InvalidTransactionId;
+
+       if (ich_table->count > (2 * ich_table->size))
+       {
+           /*
+            * distTransId != lastDXatId
+            * Means the last transaction is finished, it's ok to make a prune.
+            */
+            if (distTransId != rx_control_info.lastDXatId)
+            {
+                if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                    LOG(DEBUG1, "prune cursor history table (count %d), icid 
%d, prune_id %d",
+                        ich_table->count, sliceTable->ic_instance_id, 
sliceTable->ic_instance_id);
+                          ich_table->prune(sliceTable->ic_instance_id);
+            }
+            /*
+             * distTransId == lastDXatId and they are not 
InvalidTransactionId(0)
+             * Means current (non Read-Only) transaction isn't finished, 
should not prune.
+             */
+            else if (rx_control_info.lastDXatId != InvalidTransactionId)
+            {
+                ;
+            }
+            /*
+             * distTransId == lastDXatId and they are InvalidTransactionId(0)
+             * Means they are the same transaction or different Read-Only 
transactions.
+             *
+             * For the latter, it's hard to get a perfect timepoint to prune: 
prune eagerly may
+             * cause problems (pruned current Txn's Ic instances), but prune 
in low frequency
+             * causes memory leak.
+             *
+             * So, we choose a simple algorithm to prune it here. And if it 
mistakenly prune out
+             * the still-in-used Ic instance (with lower id), the query may 
hang forever.
+             * Then user have to set a bigger 
gp_interconnect_cursor_ic_table_size value and
+             * try the query again, it is a workaround.
+             *
+             * More backgrounds please see: 
https://github.com/greenplum-db/gpdb/pull/16458
+             */
+            else
+            {
+                if (sliceTable->ic_instance_id > ich_table->size)
+                {
+                    uint32 prune_id = sliceTable->ic_instance_id - 
ich_table->size;
+                    Assert(prune_id < sliceTable->ic_instance_id);
+ 
+                    if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                        LOG(DEBUG1, "prune cursor history table (count %d), 
icid %d, prune_id %d",
+                            ich_table->count, sliceTable->ic_instance_id, 
prune_id);
+                    ich_table->prune(prune_id);
+               }
+                       }
+               }
+
+               ich_table->add(sliceTable->ic_instance_id, 
session_param.gp_command_count);
+               /* save the latest transaction id */
+               rx_control_info.lastDXatId = distTransId;
+       }
+
+       /* Initiate receiving connections. */
+       state_impl->CreateRecvEntries(sliceTable);
+
+       /* Initiate outgoing connections. */
+       state_impl->CreateSendEntries(sliceTable);
+
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+               LOG(DEBUG1, "SetupUDPInterconnect will activate Listening on 
ports=%d/%d sockfd=%d.", 0, UDP2_GetListenPortUDP(), UDP_listenerFd);
+
+       /*
+        * If there are packets cached by background thread, add them to the
+        * connections.
+        */
+       if (session_param.gp_interconnect_cache_future_packets)
+               handleCachedPackets();
+
+       interconnect_context->activated = true;
+
+       pthread_mutex_unlock(&ic_control_info.lock);
+
+       return interconnect_context;
+}
+
+/*
+ * sendControlMessage
+ *             Helper function to send a control message.
+ */
+void
+UDPConn::sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, 
socklen_t peerLen)
+{
+       int                     n;
+
+#ifdef USE_ASSERT_CHECKING
+       if (testmode_inject_fault(session_param.gp_udpic_dropacks_percent))
+       {
+#ifdef AMS_VERBOSE_LOGGING
+               LOG(INFO, "THROW CONTROL MESSAGE with seq %d extraSeq %d srcpid 
%d despid %d", pkt->seq, pkt->extraSeq, pkt->srcPid, pkt->dstPid);
+#endif
+               return;
+       }
+#endif
+
+       /* Add CRC for the control message. */
+       if (session_param.gp_interconnect_full_crc)
+               addCRC(pkt);
+
+       /* retry 10 times for sending control message */
+       int counter = 0;
+       while (counter < 10)
+       {
+               counter++;
+               n = sendto(fd, (const char *)pkt, pkt->len, 0, addr, peerLen);
+               if (n < 0)
+               {
+                       if (errno == EINTR || errno == EAGAIN || errno == 
EWOULDBLOCK)
+                               continue;
+                       else
+                       {
+                               LOG(INFO, "sendcontrolmessage: got errno %d", 
errno);
+                               return;
+                       }
+               }
+               break;
+       }
+       if (n < int(pkt->len))
+               LOG(INFO, "sendcontrolmessage: got error %d errno %d seq %d", 
n, errno, pkt->seq);
+}
+
+/*
+ * setAckParam
+ *             Set the ack sending parameters.
+ */
+void
+UDPConn::setAckParam(AckSendParam *param, int32 flags, uint32 seq, uint32 
extraSeq)
+{
+       memcpy(&param->msg, (char *) &this->conn_info, sizeof(icpkthdr));
+       param->msg.flags = flags;
+       param->msg.seq   = seq;
+       param->msg.extraSeq = extraSeq;
+       param->msg.len   = sizeof(icpkthdr);
+       param->peer      = this->peer;
+       param->peer_len  = this->peer_len;
+}
+
+/*
+ * sendAckWithParam
+ *             Send acknowledgment to sender.
+ */
+void
+UDPConn::sendAckWithParam(AckSendParam *param)
+{
+       sendControlMessage(&param->msg, UDP_listenerFd, (struct sockaddr *) 
&param->peer, param->peer_len);
+}
+
+/*
+ * sendAck
+ *             Send acknowledgment to sender.
+ */
+void
+UDPConn::sendAck(int32 flags, uint32 seq, uint32 extraSeq)
+{
+       icpkthdr        msg;
+
+       memcpy(&msg, (char *) &this->conn_info, sizeof(msg));
+       msg.flags = flags;
+       msg.seq   = seq;
+       msg.extraSeq = extraSeq;
+       msg.len = sizeof(icpkthdr);
+
+       LOG(DEBUG1, "sendack: node %d route %d seq %d extraSeq %d, flags %s", 
msg.motNodeId, this->route, msg.seq, msg.extraSeq, flags2txt(msg.flags));
+
+       sendControlMessage(&msg, UDP_listenerFd, (struct sockaddr *) 
&this->peer, this->peer_len);
+}
+
+/*
+ * sendDisorderAck
+ *             Send a disorder message to the sender.
+ *
+ * Whenever the receiver detects a disorder packet, it will assemble a 
disorder message
+ * which contains the sequence numbers of the possibly lost packets.
+ *
+ */
+void
+UDPConn::sendDisorderAck(uint32 seq, uint32 extraSeq, uint32 lostPktCnt)
+{
+       icpkthdr   *disorderBuffer = rx_control_info.disorderBuffer;
+
+       memcpy(disorderBuffer, (char *) &this->conn_info, sizeof(icpkthdr));
+
+       disorderBuffer->flags |= UDPIC_FLAGS_DISORDER;
+       disorderBuffer->seq = seq;
+       disorderBuffer->extraSeq = extraSeq;
+       disorderBuffer->len = lostPktCnt * sizeof(uint32) + sizeof(icpkthdr);
+
+#ifdef AMS_VERBOSE_LOGGING
+       if (!(this->peer.ss_family == AF_INET || this->peer.ss_family == 
AF_INET6))
+       {
+               LOG(INFO, "UDP Interconnect bug (in sendDisorderAck): trying to 
send ack when we don't know where to send to %s", this->remoteHostAndPort);
+       }
+#endif
+
+       sendControlMessage(disorderBuffer, UDP_listenerFd, (struct sockaddr *) 
&this->peer, this->peer_len);
+}
+
+/*
+ * sendStatusQueryMessage
+ *             Used by senders to send a status query message for a connection 
to receivers.
+ *
+ * When receivers get such a message, they will respond with
+ * the connection status (consumed seq, received seq ...).
+ */
+void
+UDPConn::sendStatusQueryMessage(uint32 seq)
+{
+       icpkthdr        msg;
+
+       memcpy(&msg, (char *) &this->conn_info, sizeof(msg));
+       msg.flags = UDPIC_FLAGS_CAPACITY;
+       msg.seq = seq;
+       msg.extraSeq = 0;
+       msg.len = sizeof(msg);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+       trans_proto_stats.update(TPE_ACK_PKT_QUERY, &msg);
+#endif
+
+       sendControlMessage(&msg, entry_->txfd, (struct sockaddr *) &this->peer, 
this->peer_len);
+}
+
+/*
+ * ReleaseBuffer
+ *             Return a buffer and send an acknowledgment.
+ *
+ *  SHOULD BE CALLED WITH ic_control_info.lock *LOCKED*
+ */
+void
+UDPConn::ReleaseBuffer(AckSendParam *param)
+{
+       icpkthdr   *buf;
+       uint32          seq;
+
+       buf = (icpkthdr *) this->pkt_q[this->pkt_q_head];
+       if (buf == NULL)
+       {
+               pthread_mutex_unlock(&ic_control_info.lock);
+               throw ICFatalException("FATAL: ReleaseBuffer: buffer is NULL", 
__FILE__, __LINE__);
+       }
+
+       seq = buf->seq;
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "LOG: ReleaseBuffer conn %p pkt [seq %d] for node %d route 
%d, [head seq] %d queue size %d, queue head %d queue tail %d",
+                         this, seq, buf->motNodeId, this->route, 
this->conn_info.seq - this->pkt_q_size, this->pkt_q_size, this->pkt_q_head, 
this->pkt_q_tail);
+#endif
+
+       this->pkt_q[this->pkt_q_head] = NULL;
+       this->pBuff = NULL;
+       this->pkt_q_head = (this->pkt_q_head + 1) % this->pkt_q_capacity;
+       this->pkt_q_size--;
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "LOG: ReleaseBuffer conn %p pkt [seq %d] for node %d route 
%d, [head seq] %d queue size %d, queue head %d queue tail %d",
+                         this, seq, buf->motNodeId, this->route, 
this->conn_info.seq - this->pkt_q_size, this->pkt_q_size, this->pkt_q_head, 
this->pkt_q_tail);
+#endif
+
+       rx_buffer_pool.put(buf);
+       this->conn_info.extraSeq = seq;
+
+       /* Send an Ack to the sender. */
+       if ((seq % 2 == 0) || (this->pkt_q_capacity == 1))
+       {
+               if (param != NULL)
+               {
+                       this->setAckParam(param, UDPIC_FLAGS_ACK | 
UDPIC_FLAGS_CAPACITY | this->conn_info.flags, this->conn_info.seq - 1, seq);
+               }
+               else
+               {
+                       this->sendAck(UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY | 
this->conn_info.flags, this->conn_info.seq - 1, seq);
+               }
+       }
+}
+
+/*
+ * computeExpirationPeriod
+ *             Compute expiration period according to the connection 
information.
+ *
+ * Considerations on expiration period computation:
+ *
+ * RTT is dynamically computed, and expiration period is based on RTT values.
+ * We cannot simply use RTT as the expiration value, since real workload does
+ * not always have a stable RTT. A small constant value is multiplied to the 
RTT value
+ * to make the resending logic insensitive to the frequent small changes of 
RTT.
+ *
+ */
+uint64
+UDPConn::computeExpirationPeriod(uint32 retry)
+{
+       /*
+        * In fault injection mode, we often use DEFAULT_RTT, because the
+        * intentional large percent of packet/ack losses will make the RTT too
+        * large. This will lead to a slow retransmit speed. In real hardware
+        * environment/workload, we do not expect such a packet loss pattern.
+        */
+#ifdef USE_ASSERT_CHECKING
+       if (udp_testmode)
+       {
+               return DEFAULT_RTT;
+       }
+       else
+#endif
+       {
+               uint32 factor = (retry <= 12 ? retry : 12);
+               return Max(MIN_EXPIRATION_PERIOD, Min(MAX_EXPIRATION_PERIOD, 
(int)(this->rtt + (this->dev << 2)) << (factor)));
+       }
+}
+
+/*
+ * freeDisorderedPackets
+ *             Put the disordered packets into free buffer list.
+ */
+void
+UDPConn::freeDisorderedPackets()
+{
+       uint32          k;
+
+       if (this->pkt_q == NULL)
+               return;
+
+       for (k = 0; k < this->pkt_q_capacity; k++)
+       {
+               icpkthdr   *buf = (icpkthdr *)this->pkt_q[k];
+
+               if (buf != NULL)
+               {
+                       if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC)
+                               LOG(DEBUG1, "CLEAR Out-of-order PKT: conn %p 
pkt [seq %d] for node %d route %d, "
+                                                       "[head seq] %d queue 
size %d, queue head %d queue tail %d",
+                                                       this, buf->seq, 
buf->motNodeId, this->route, this->conn_info.seq - this->pkt_q_size,
+                                                       this->pkt_q_size, 
this->pkt_q_head, this->pkt_q_tail);
+
+                       /* return the buffer into the free list. */
+                       rx_buffer_pool.put(buf);
+                       this->pkt_q[k] = NULL;
+               }
+       }
+}
+
+/*
+ * prepareRxConnForRead
+ *             Prepare the receive connection for reading.
+ *
+ * MUST BE CALLED WITH ic_control_info.lock LOCKED.
+ */
+void
+UDPConn::prepareRxConnForRead()
+{
+       LOG(DEBUG3, "In prepareRxConnForRead: conn %p, q_head %d q_tail %d 
q_size %d",
+                       this, this->pkt_q_head, this->pkt_q_tail, 
this->pkt_q_size);
+
+       Assert(this->pkt_q[this->pkt_q_head] != NULL);
+       this->pBuff     = this->pkt_q[this->pkt_q_head];
+       this->msgPos    = this->pBuff;
+       this->msgSize   = ((icpkthdr *) this->pBuff)->len;
+       this->recvBytes = this->msgSize;
+}
+
+/*
+ * DeactiveConn
+ *             Mark the connection inactive.
+ */
+void
+UDPConn::DeactiveConn()
+{
+       pthread_mutex_lock(&ic_control_info.lock);
+       this->stillActive = false;
+       pthread_mutex_unlock(&ic_control_info.lock);
+}
+
+/*
+ * handleAckedPacket
+ *             Called by sender to process acked packet.
+ *
+ *     Remove it from unack queue and unack queue ring, change the rtt ...
+ *
+ *     RTT (Round Trip Time) is computed as the time between we send the packet
+ *     and receive the acknowledgement for the packet. When an acknowledgement
+ *     is received, an estimated RTT value (called SRTT, smoothed RTT) is 
updated
+ *     by using the following equation. And we also set a limitation of the max
+ *     value and min value for SRTT.
+ *         (1) SRTT = (1 - g) SRTT + g x RTT (0 < g < 1)
+ *     where RTT is the measured round trip time of the packet. In 
implementation,
+ *     g is set to 1/8. In order to compute expiration period, we also compute 
an
+ *     estimated delay variance SDEV by using:
+ *         (2) SDEV = (1 - h) x SDEV + h x |SERR| (0 < h < 1, In 
implementation, h is set to 1/4)
+ *     where SERR is calculated by using:
+ *         (3) SERR = RTT - SRTT
+ *     Expiration period determines the timing we resend a packet. A long RTT 
means
+ *     a long expiration period. Delay variance is used to incorporate the 
variance
+ *     of workload/network variances at different time. When a packet is 
retransmitted,
+ *     we back off exponentially the expiration period.
+ *         (4) exp_period = (SRTT + y x SDEV) << retry
+ *     Here y is a constant (In implementation, we use 4) and retry is the 
times the
+ *     packet is retransmitted.
+ */
+void
+UDPConn::handleAckedPacket(ICBuffer *buf, uint64 now)
+{
+       uint64          ackTime = 0;
+       bool            bufIsHead = false; 
+       UDPConn *bufConn = NULL;
+
+       bufIsHead = (&buf->primary == this->unackQueue.first());
+
+       buf = this->unackQueue.remove(buf);
+
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+       {
+               ICBufferList *alist = 
&unack_queue_ring.slots[buf->unackQueueRingSlot];
+               buf = alist->remove(buf);
+               unack_queue_ring.numOutStanding--;
+               if (this->unackQueue.length() >= 1)
+                       unack_queue_ring.numSharedOutStanding--;
+
+               ackTime = now - buf->sentTime;
+
+               /*
+                * In udp_testmode, we do not change rtt dynamically due to the 
large
+                * number of packet losses introduced by fault injection code. 
This
+                * can decrease the testing time.
+                */
+#ifdef USE_ASSERT_CHECKING
+               if (!udp_testmode)
+#endif
+               {
+                       uint64          newRTT = 0;
+                       uint64          newDEV = 0;
+
+                       if (buf->nRetry == 0)
+                       {
+                               bufConn = static_cast<UDPConn*>(buf->conn);
+                               newRTT = bufConn->rtt - (bufConn->rtt >> 
RTT_SHIFT_COEFFICIENT) + (ackTime >> RTT_SHIFT_COEFFICIENT);
+                               newRTT = Min(MAX_RTT, Max(newRTT, MIN_RTT));
+                               bufConn->rtt = newRTT;
+
+                               newDEV = bufConn->dev - (bufConn->dev >> 
DEV_SHIFT_COEFFICIENT) + ((Max(ackTime, newRTT) - Min(ackTime, newRTT)) >> 
DEV_SHIFT_COEFFICIENT);
+                               newDEV = Min(MAX_DEV, Max(newDEV, MIN_DEV));
+                               bufConn->dev = newDEV;
+
+                               /* adjust the congestion control window. */
+                               if (snd_control_info.cwnd < 
snd_control_info.ssthresh)
+                                       snd_control_info.cwnd += 1;
+                               else
+                                       snd_control_info.cwnd += 1 / 
snd_control_info.cwnd;
+                               snd_control_info.cwnd = 
Min(snd_control_info.cwnd, snd_buffer_pool.maxCount);
+                       }
+               }
+       }
+
+       bufConn = static_cast<UDPConn*>(buf->conn);
+       bufConn->stat_total_ack_time += ackTime;
+       bufConn->stat_max_ack_time = Max(ackTime, bufConn->stat_max_ack_time);
+       bufConn->stat_min_ack_time = Min(ackTime, bufConn->stat_min_ack_time);
+
+       /*
+        * only change receivedAckSeq when it is the smallest pkt we sent and 
have
+        * not received ack for it.
+        */
+       if (bufIsHead)
+               this->receivedAckSeq = buf->pkt->seq;
+
+       /* The first packet acts like a connect setup packet */
+       if (buf->pkt->seq == 1)
+               this->state = mcsStarted;
+
+       snd_buffer_pool.freeList.append(buf);
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "REMOVEPKT %d from unack queue for route %d (retry %d) 
sndbufmaxcount %d sndbufcount %d "
+                         "sndbuffreelistlen %d, sntSeq %d consumedSeq %d 
recvAckSeq %d capacity %d, sndQ %d, unackQ %d",
+                         buf->pkt->seq, this->route, buf->nRetry, 
snd_buffer_pool.maxCount, snd_buffer_pool.count,
+                         snd_buffer_pool.freeList.length(), bufConn->sentSeq, 
bufConn->consumedSeq,
+                         bufConn->receivedAckSeq, bufConn->capacity, 
bufConn->sndQueue.length(),
+                         bufConn->unackQueue.length());
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+       {
+               bufConn->unackQueue.icBufferListLog();
+               bufConn->sndQueue.icBufferListLog();
+       }
+#endif
+}
+
+/*
+ * dispatcherAYT
+ *             Check the connection from the dispatcher to verify that it is 
still there.
+ *
+ * The connection is a struct Port, stored in the global MyProcPort.
+ *
+ * Return true if the dispatcher connection is still alive.
+ */
+static bool
+dispatcherAYT(void)
+{
+       ssize_t         ret;
+       char            buf;
+
+       /*
+        * For background worker or auxiliary process like gdd, there is no 
client.
+        * As a result, MyProcPort is NULL. We should skip dispatcherAYT check 
here.
+        */
+       if (global_param.MyProcPort == false)
+               return true;
+
+       if (global_param.myprocport_sock < 0)
+               return false;
+
+#ifndef WIN32
+       ret = recv(global_param.myprocport_sock, &buf, 1, MSG_PEEK | 
MSG_DONTWAIT);
+#else
+       ret = recv(global_param.myprocport_sock, &buf, 1, MSG_PEEK | 
MSG_PARTIAL);
+#endif
+
+       if (ret == 0)                           /* socket has been closed. EOF 
*/
+               return false;
+
+       if (ret > 0)                            /* data waiting on socket, it 
must be OK. */
+               return true;
+
+       if (ret == -1)                          /* error, or would be block. */
+       {
+               if (errno == EAGAIN || errno == EINPROGRESS)
+                       return true;            /* connection intact, no data 
available */
+               else
+                       return false;
+       }
+       /* not reached */
+
+       return true;
+}
+
+/*
+ * checkQDConnectionAlive
+ *             Check whether QD connection is still alive. If not, report 
error.
+ */
+static void
+checkQDConnectionAlive(void)
+{
+       if (!dispatcherAYT())
+       {
+               if (global_param.Gp_role == GP_ROLE_EXECUTE_IC)
+                       throw ICNetworkException("interconnect error segment 
lost contact with master (recv)", __FILE__, __LINE__);
+               else
+                       throw ICNetworkException("interconnect error master 
lost contact with client (recv)", __FILE__, __LINE__);
+       }
+}
+
+/*
+ * getCurrentTime
+ *             get current time
+ *
+ */
+static uint64
+getCurrentTime(void)
+{
+       struct timeval newTime;
+       int                     status = 1;
+       uint64          t = 0;
+
+#if HAVE_LIBRT
+       /* Use clock_gettime to return monotonic time value. */
+       struct timespec ts;
+
+       status = clock_gettime(CLOCK_MONOTONIC, &ts);
+
+       newTime.tv_sec = ts.tv_sec;
+       newTime.tv_usec = ts.tv_nsec / 1000;
+
+#endif
+
+       if (status != 0)
+               gettimeofday(&newTime, NULL);
+
+       t = ((uint64) newTime.tv_sec) * USECS_PER_SECOND + newTime.tv_usec;
+       return t;
+}
+
+/*
+ * putIntoUnackQueueRing
+ *             Put the buffer into the ring.
+ *
+ * expTime - expiration time from now
+ *
+ */
+static void
+putIntoUnackQueueRing(UnackQueueRing *uqr, ICBuffer *buf, uint64 expTime, 
uint64 now)
+{
+       uint64          diff = 0;
+       int                     idx = 0;
+
+       /* The first packet, currentTime is not initialized */
+       if (uqr->currentTime == 0)
+               uqr->currentTime = now - (now % TIMER_SPAN);
+
+       diff = now + expTime - uqr->currentTime;
+       if (diff >= UNACK_QUEUE_RING_LENGTH)
+       {
+#ifdef AMS_VERBOSE_LOGGING
+               LOG(INFO, "putIntoUnackQueueRing:" "now %lu expTime %lu diff 
%lu uqr-currentTime %lu", now, expTime, diff, uqr->currentTime);
+#endif
+               diff = UNACK_QUEUE_RING_LENGTH - 1;
+       }
+       else if (diff < TIMER_SPAN)
+       {
+               diff = TIMER_SPAN;
+       }
+
+       idx = (uqr->idx + diff / TIMER_SPAN) % UNACK_QUEUE_RING_SLOTS_NUM;
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "PUTTW: curtime %lu now %lu (diff %lu) expTime %lu previdx 
%d, nowidx %d, nextidx %d", uqr->currentTime, now, diff, expTime, 
buf->unackQueueRingSlot, uqr->idx, idx);
+#endif
+
+       buf->unackQueueRingSlot = idx;
+       unack_queue_ring.slots[idx].append(buf);
+}
+
+/*
+ * handleDataPacket
+ *             Handling the data packet.
+ *
+ * On return, will set *wakeup_mainthread, if a packet was received 
successfully
+ * and the caller should wake up the main thread, after releasing the mutex.
+ */
+static bool
+handleDataPacket(UDPConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, 
socklen_t *peerlen,
+                                AckSendParam *param, bool *wakeup_mainthread)
+{
+       if ((pkt->len == sizeof(icpkthdr)) && (pkt->flags & 
UDPIC_FLAGS_CAPACITY))
+       {
+               if (IC_DEBUG1 >= session_param.log_min_messages)
+                       LOG(DEBUG1, "status queuy message received, seq %d, 
srcpid %d, dstpid %d, icid %d, sid %d",
+                                         pkt->seq, pkt->srcPid, pkt->dstPid, 
pkt->icId, pkt->sessionId);
+
+#ifdef AMS_VERBOSE_LOGGING
+               logPkt("STATUS QUERY MESSAGE", pkt);
+#endif
+               uint32          seq = conn->conn_info.seq > 0 ? 
conn->conn_info.seq - 1 : 0;
+               uint32          extraSeq = conn->stopRequested ? seq : 
conn->conn_info.extraSeq;
+
+               conn->setAckParam(param, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK 
| conn->conn_info.flags, seq, extraSeq);
+
+               return false;
+       }
+
+       /*
+        * when we're not doing a full-setup on every statement, we've got to
+        * update the peer info -- full setups do this at setup-time.
+        */
+
+       /*
+        * Note the change here, for process start race and disordered message, 
if
+        * we do not fill in peer address, then we may send some acks to unknown
+        * address. Thus, the following condition is used.
+        *
+        */
+       if (pkt->seq <= conn->pkt_q_capacity)
+       {
+               /* fill in the peer.  Need to cast away "volatile".  ugly */
+               memset((void *) &conn->peer, 0, sizeof(conn->peer));
+               memcpy((void *) &conn->peer, peer, *peerlen);
+               conn->peer_len = *peerlen;
+
+               conn->conn_info.dstListenerPort = pkt->dstListenerPort;
+               if (IC_DEBUG2 >= session_param.log_min_messages)
+                       LOG(DEBUG2, "received the head packets when eliding 
setup, pkt seq %d", pkt->seq);
+       }
+
+       /* data packet */
+       if (pkt->flags & UDPIC_FLAGS_EOS)
+       {
+               if (IC_DEBUG3 >= session_param.log_min_messages)
+                       LOG(DEBUG3, "received packet with EOS motid %d route %d 
seq %d",
+                                               pkt->motNodeId, conn->route, 
pkt->seq);
+       }
+
+       /*
+        * if we got a stop, but didn't request a stop -- ignore, this is a
+        * startup blip: we must have acked with a stop -- we don't want to do
+        * anything further with the stop-message if we didn't request a stop!
+        *
+        * this is especially important after eliding setup is enabled.
+        */
+       if (!conn->stopRequested && (pkt->flags & UDPIC_FLAGS_STOP))
+       {
+               if (pkt->flags & UDPIC_FLAGS_EOS)
+               {
+                       LOG(INFO, "non-requested stop flag, EOS! seq %d, flags 
0x%x", pkt->seq, pkt->flags);
+               }
+               return false;
+       }
+
+       if (conn->stopRequested && conn->stillActive)
+       {
+               if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_DEBUG_IC && IC_DEBUG5 >= session_param.log_min_messages)
+                       LOG(DEBUG5, "rx_thread got packet on active connection 
marked stopRequested. "
+                                               "(flags 0x%x) node %d route %d 
pkt seq %d conn seq %d",
+                                               pkt->flags, pkt->motNodeId, 
conn->route, pkt->seq, conn->conn_info.seq);
+
+               /* can we update stillActive ? */
+               if (IC_DEBUG2 >= session_param.log_min_messages)
+                       if (!(pkt->flags & UDPIC_FLAGS_STOP) && !(pkt->flags & 
UDPIC_FLAGS_EOS))
+                               LOG(DEBUG2, "stop requested but no stop flag on 
return packet ?!");
+
+               if (pkt->flags & UDPIC_FLAGS_EOS)
+                       conn->conn_info.flags |= UDPIC_FLAGS_EOS;
+
+               if (conn->conn_info.seq < pkt->seq)
+                       conn->conn_info.seq = pkt->seq; /* note here */
+
+               conn->setAckParam(param, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | 
UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);
+
+               /* we only update stillActive if eos has been sent by peer. */
+               if (pkt->flags & UDPIC_FLAGS_EOS)
+               {
+                       if (IC_DEBUG2 >= session_param.log_min_messages)
+                               LOG(DEBUG2, "stop requested and acknowledged by 
sending peer");
+                       conn->stillActive = false;
+               }
+
+               return false;
+       }
+
+       /* dropped ack or timeout */
+       if (pkt->seq < conn->conn_info.seq)
+       {
+               ic_statistics.duplicatedPktNum++;
+               if (IC_DEBUG3 >= session_param.log_min_messages)
+                       LOG(DEBUG3, "dropped ack ? ignored data packet w/ cmd 
%d conn->cmd %d node %d route %d seq %d expected %d flags 0x%x",
+                                               pkt->icId, 
conn->conn_info.icId, pkt->motNodeId, conn->route, pkt->seq, 
conn->conn_info.seq, pkt->flags);
+
+               conn->setAckParam(param, UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY 
| conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq);
+
+               return false;
+       }
+
+       /* sequence number is correct */
+       if (!conn->stillActive)
+       {
+               /* peer may have dropped ack */
+               if (session_param.gp_log_interconnect >= 
GPVARS_VERBOSITY_VERBOSE_IC &&
+                       IC_DEBUG1 >= session_param.log_min_messages)
+                       LOG(DEBUG1, "received on inactive connection node %d 
route %d (seq %d pkt->seq %d)",
+                                               pkt->motNodeId, conn->route, 
conn->conn_info.seq, pkt->seq);
+
+               if (conn->conn_info.seq < pkt->seq)
+                       conn->conn_info.seq = pkt->seq;
+               conn->setAckParam(param, UDPIC_FLAGS_ACK | UDPIC_FLAGS_STOP | 
UDPIC_FLAGS_CAPACITY | conn->conn_info.flags, pkt->seq, pkt->seq);
+
+               return false;
+       }
+
+       /* headSeq is the seq for the head packet. */
+       uint32          headSeq = conn->conn_info.seq - conn->pkt_q_size;
+
+       if ((conn->pkt_q_size == conn->pkt_q_capacity) || (pkt->seq - headSeq 
>= conn->pkt_q_capacity))
+       {
+               /*
+                * Error case: NO RX SPACE or out of range pkt This indicates a 
bug.
+                */
+               logPkt("Interconnect error: received a packet when the queue is 
full ", pkt);
+               ic_statistics.disorderedPktNum++;
+               conn->stat_count_dropped++;
+               return false;
+       }
+
+       /* put the packet at the his position */
+       bool            toWakeup = false;
+
+       int                     pos = (pkt->seq - 1) % conn->pkt_q_capacity;
+
+       if (conn->pkt_q[pos] == NULL)
+       {
+               conn->pkt_q[pos] = (uint8 *) pkt;
+               if (pos == conn->pkt_q_head)
+               {
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "SAVE pkt at QUEUE HEAD [seq %d] for node %d 
route %d, queue head seq %d, queue size %d, queue head %d queue tail %d",
+                                         pkt->seq, pkt->motNodeId, 
conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
+#endif
+                       toWakeup = true;
+               }
+
+               if (pos == conn->pkt_q_tail)
+               {
+                       /* move the queue tail */
+                       for (; conn->pkt_q[conn->pkt_q_tail] != NULL && 
conn->pkt_q_size < conn->pkt_q_capacity;)
+                       {
+                               conn->pkt_q_size++;
+                               conn->pkt_q_tail = (conn->pkt_q_tail + 1) % 
conn->pkt_q_capacity;
+                               conn->conn_info.seq++;
+                       }
+
+                       /* set the EOS flag */
+                       if (((icpkthdr *) (conn->pkt_q[(conn->pkt_q_tail + 
conn->pkt_q_capacity - 1) % conn->pkt_q_capacity]))->flags & UDPIC_FLAGS_EOS)
+                       {
+                               conn->conn_info.flags |= UDPIC_FLAGS_EOS;
+                               if (IC_DEBUG1 >= session_param.log_min_messages)
+                                       LOG(DEBUG1, "RX_THREAD: the packet with 
EOS flag is available for access in the queue for route %d", conn->route);
+                       }
+
+                       /* ack data packet */
+                       conn->setAckParam(param, UDPIC_FLAGS_CAPACITY | 
UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, 
conn->conn_info.extraSeq);
+
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "SAVE conn %p pkt at QUEUE TAIL [seq %d] at 
pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head %d 
queue tail %d",
+                                         conn, pkt->seq, pos, pkt->motNodeId, 
conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
+#endif
+               }
+               else                                    /* deal with 
out-of-order packet */
+               {
+                       if (IC_DEBUG1 >= session_param.log_min_messages)
+                               LOG(DEBUG1, "SAVE conn %p OUT-OF-ORDER pkt [seq 
%d] at pos [%d] for node %d route %d, [head seq] %d, queue size %d, queue head 
%d queue tail %d",
+                                                       conn, pkt->seq, pos, 
pkt->motNodeId, conn->route, headSeq, conn->pkt_q_size, conn->pkt_q_head, 
conn->pkt_q_tail);
+
+                       /* send an ack for out-of-order packet */
+                       ic_statistics.disorderedPktNum++;
+                       conn->handleDisorderPacket(pos, headSeq + 
conn->pkt_q_size, pkt);
+               }
+       }
+       else                                            /* duplicate pkt */
+       {
+               if (IC_DEBUG1 >= session_param.log_min_messages)
+                       LOG(DEBUG1, "DUPLICATE pkt [seq %d], [head seq] %d, 
queue size %d, queue head %d queue tail %d",
+                                               pkt->seq, headSeq, 
conn->pkt_q_size, conn->pkt_q_head, conn->pkt_q_tail);
+
+               conn->setAckParam(param, UDPIC_FLAGS_DUPLICATE | 
conn->conn_info.flags, pkt->seq, conn->conn_info.seq - 1);
+               ic_statistics.duplicatedPktNum++;
+               return false;
+       }
+
+       /* Was the main thread waiting for something ? */
+       if (rx_control_info.mainWaitingState.waiting &&
+               rx_control_info.mainWaitingState.waitingNode == pkt->motNodeId 
&&
+               rx_control_info.mainWaitingState.waitingQuery == pkt->icId && 
toWakeup)
+       {
+               if (rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
+               {
+                       if (rx_control_info.mainWaitingState.reachRoute == 
ANY_ROUTE)
+                               rx_control_info.mainWaitingState.reachRoute = 
conn->route;
+               }
+               else if (rx_control_info.mainWaitingState.waitingRoute == 
conn->route)
+               {
+                       if (IC_DEBUG2 >= session_param.log_min_messages)
+                               LOG(DEBUG2, "rx thread: main_waiting waking it 
route %d", rx_control_info.mainWaitingState.waitingRoute);
+                       rx_control_info.mainWaitingState.reachRoute = 
conn->route;
+               }
+               /* WAKE MAIN THREAD HERE */
+               *wakeup_mainthread = true;
+       }
+
+       return true;
+}
+
+/*
+ * rxThreadFunc
+ *             Main function of the receive background thread.
+ *
+ * NOTE: This function MUST NOT contain elog or ereport statements.
+ * elog is NOT thread-safe.  Developers should instead use something like:
+ *
+ * NOTE: In threads, we cannot use palloc/pfree, because it's not thread safe.
+ */
+static void *
+rxThreadFunc(void *arg)
+{
+       icpkthdr   *pkt = NULL;
+       bool            skip_poll = false;
+
+       for (;;)
+       {
+               struct pollfd nfd;
+               int                     n;
+
+               /* check shutdown condition */
+               if (ic_atomic_read_u32(&ic_control_info.shutdown) == 1)
+               {
+                       if (IC_DEBUG1 >= session_param.log_min_messages)
+                               LOG(DEBUG1, "udp-ic: rx-thread shutting down");
+                       break;
+               }
+
+               /* Try to get a buffer */
+               if (pkt == NULL)
+               {
+                       pthread_mutex_lock(&ic_control_info.lock);
+                       pkt = rx_buffer_pool.get();
+                       pthread_mutex_unlock(&ic_control_info.lock);
+
+                       if (pkt == NULL)
+                       {
+                               setRxThreadError(ENOMEM);
+                               continue;
+                       }
+               }
+
+               if (!skip_poll)
+               {
+                       /* Do we have inbound traffic to handle ? */
+                       nfd.fd = UDP_listenerFd;
+                       nfd.events = POLLIN;
+
+                       n = poll(&nfd, 1, RX_THREAD_POLL_TIMEOUT);
+
+                       if (ic_atomic_read_u32(&ic_control_info.shutdown) == 1)
+                       {
+                               if (IC_DEBUG1 >= session_param.log_min_messages)
+                                       LOG(DEBUG1, "udp-ic: rx-thread shutting 
down");
+                               break;
+                       }
+
+                       if (n < 0)
+                       {
+                               if (errno == EINTR)
+                                       continue;
+
+                               /*
+                                * ERROR case: if simply break out the loop 
here, there will
+                                * be a hung here, since main thread will never 
be waken up,
+                                * and senders will not get responses anymore.
+                                *
+                                * Thus, we set an error flag, and let main 
thread to report
+                                * an error.
+                                */
+                               setRxThreadError(errno);
+                               continue;
+                       }
+
+                       if (n == 0)
+                               continue;
+               }
+
+               if (skip_poll || (n == 1 && (nfd.events & POLLIN)))
+               {
+                       /* we've got something interesting to read */
+                       /* handle incoming */
+                       /* ready to read on our socket */
+                       int read_count = 0;
+
+                       struct sockaddr_storage peer;
+                       socklen_t       peerlen;
+
+                       peerlen = sizeof(peer);
+                       read_count = recvfrom(UDP_listenerFd, (char *) pkt, 
global_param.Gp_max_packet_size, 0,
+                                                                 (struct 
sockaddr *) &peer, &peerlen);
+
+                       if (ic_atomic_read_u32(&ic_control_info.shutdown) == 1)
+                       {
+                               if (IC_DEBUG1 >= session_param.log_min_messages)
+                                       LOG(DEBUG1, "udp-ic: rx-thread shutting 
down");
+                               break;
+                       }
+
+                       if (IC_DEBUG5 >= session_param.log_min_messages)
+                               LOG(DEBUG5, "received inbound len %d", 
read_count);
+
+                       if (read_count < 0)
+                       {
+                               skip_poll = false;
+
+                               if (errno == EWOULDBLOCK || errno == EINTR)
+                                       continue;
+
+                               LOG(LOG_ERROR, "Interconnect error: recvfrom 
(%d)", errno);
+
+                               /*
+                                * ERROR case: if simply break out the loop 
here, there will
+                                * be a hung here, since main thread will never 
be waken up,
+                                * and senders will not get responses anymore.
+                                *
+                                * Thus, we set an error flag, and let main 
thread to report
+                                * an error.
+                                */
+                               setRxThreadError(errno);
+                               continue;
+                       }
+
+                       if (static_cast<unsigned long>(read_count) < 
sizeof(icpkthdr))
+                       {
+                               if (IC_DEBUG1 >= session_param.log_min_messages)
+                                       LOG(DEBUG1, "Interconnect error: short 
conn receive (%d)", read_count);
+                               continue;
+                       }
+
+                       /*
+                        * when we get a "good" recvfrom() result, we can skip 
poll()
+                        * until we get a bad one.
+                        */
+                       skip_poll = true;
+
+                       /* length must be >= 0 */
+                       if (pkt->len < 0)
+                       {
+                               if (IC_DEBUG3 >= session_param.log_min_messages)
+                                       LOG(DEBUG3, "received inbound with 
negative length");
+                               continue;
+                       }
+
+                       if (pkt->len != static_cast<unsigned int>(read_count))
+                       {
+                               if (IC_DEBUG3 >= session_param.log_min_messages)
+                                       LOG(DEBUG3, "received inbound packet 
[%d], short: read %d bytes, pkt->len %d", pkt->seq, read_count, pkt->len);
+                               continue;
+                       }
+
+                       /*
+                        * check the CRC of the payload.
+                        */
+                       if (session_param.gp_interconnect_full_crc)
+                       {
+                               if (!checkCRC(pkt))
+                               {
+                                       
ic_atomic_add_fetch_u32((ic_atomic_uint32 *) &ic_statistics.crcErrors, 1);
+                                       if (IC_DEBUG2 >= 
session_param.log_min_messages)
+                                               LOG(DEBUG2, "received network 
data error, dropping bad packet, user data unaffected.");
+                                       continue;
+                               }
+                       }
+
+#ifdef AMS_VERBOSE_LOGGING
+                       logPkt("GOT MESSAGE", pkt);
+#endif
+
+                       bool            wakeup_mainthread = false;
+                       AckSendParam param;
+
+                       memset(&param, 0, sizeof(AckSendParam));
+
+                       /*
+                        * Get the connection for the pkt.
+                        *
+                        * The connection hash table should be locked until 
finishing the
+                        * processing of the packet to avoid the connection
+                        * addition/removal from the hash table during the mean 
time.
+                        */
+                       pthread_mutex_lock(&ic_control_info.lock);
+                       UDPConn *conn = ic_control_info.connHtab.find(pkt);
+                       if (conn != NULL)
+                       {
+                               /* Handling a regular packet */
+                               if (handleDataPacket(conn, pkt, &peer, 
&peerlen, &param, &wakeup_mainthread))
+                                       pkt = NULL;
+                               ic_statistics.recvPktNum++;
+                       }
+                       else
+                       {
+                               /*
+                                * There may have two kinds of Mismatched 
packets: a) Past
+                                * packets from previous command after I was 
torn down b)
+                                * Future packets from current command before 
my connections
+                                * are built.
+                                *
+                                * The handling logic is to "Ack the past and 
Nak the future".
+                                */
+                               if ((pkt->flags & 
UDPIC_FLAGS_RECEIVER_TO_SENDER) == 0)
+                               {
+                                       if (IC_DEBUG1 >= 
session_param.log_min_messages)
+                                               LOG(DEBUG1, "mismatched packet 
received, seq %d, srcpid %d, dstpid %d, icid %d, sid %d",
+                                                                       
pkt->seq, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->sessionId);
+
+#ifdef AMS_VERBOSE_LOGGING
+                                       logPkt("Got a Mismatched Packet", pkt);
+#endif
+
+                                       if (handleMismatch(pkt, &peer, peerlen))
+                                               pkt = NULL;
+                                       ic_statistics.mismatchNum++;
+                               }
+                       }
+                       pthread_mutex_unlock(&ic_control_info.lock);
+
+                       if (wakeup_mainthread) {
+                               cv.notify_one();
+                       }
+
+                       /*
+                        * real ack sending is after lock release to decrease 
the lock
+                        * holding time.
+                        */
+                       if (param.msg.len != 0)
+                               UDPConn::sendAckWithParam(&param);
+               }
+
+               /* pthread_yield(); */
+       }
+
+       /* Before return, we release the packet. */
+       if (pkt)
+       {
+               pthread_mutex_lock(&ic_control_info.lock);
+               rx_buffer_pool.release(pkt);
+               pkt = NULL;
+               pthread_mutex_unlock(&ic_control_info.lock);
+       }
+
+       /* nothing to return */
+       return NULL;
+}
+
+/*
+ * handleMismatch
+ *             If the mismatched packet is from an old connection, we may need 
to
+ *             send an acknowledgment.
+ *
+ * We are called with the receiver-lock held, and we never release it.
+ *
+ * For QD:
+ * 1) Not in hashtable     : NAK it/Do nothing
+ *       Causes:  a) Start race
+ *                b) Before the entry for the ic instance is inserted, an 
error happened.
+ *                c) From past transactions: should no happen.
+ * 2) Active in hashtable  : NAK it/Do nothing
+ *    Causes:  a) Error reported after the entry is inserted, and connections 
are
+ *                not inserted to the hashtable yet, and before teardown is 
called.
+ * 3) Inactive in hashtable: ACK it (with stop)
+ *    Causes: a) Normal execution: after teardown is called on current command.
+ *            b) Error case, 2a) after teardown is called.
+ *            c) Normal execution: from past history transactions (should not 
happen).
+ *
+ * For QE:
+ * 1) pkt->id > ic_control_info.ic_instance_id : NAK it/Do nothing
+ *    Causes: a) Start race
+ *            b) Before ic_control_info.ic_instance_id is assigned to correct 
value, an error happened.
+ * 2) lastTornIcId < pkt->id == ic_control_info.ic_instance_id: NAK it/Do 
nothing
+ *    Causes:  a) Error reported after ic_control_info.ic_instance_id is set, 
and connections are
+ *                not inserted to the hashtable yet, and before teardown is 
called.
+ * 3) lastTornIcId == pkt->id == ic_control_info.ic_instance_id: ACK it (with 
stop)
+ *    Causes:  a) Normal execution: after teardown is called on current command
+ * 4) pkt->id < ic_control_info.ic_instance_id: NAK it/Do nothing/ACK it.
+ *    Causes:  a) Should not happen.
+ *
+ */
+static bool
+handleMismatch(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
+{
+       bool            cached = false;
+
+       /*
+        * we want to ack old packets; but *must* avoid acking connection
+        * requests:
+        *
+        * "ACK the past, NAK the future" explicit NAKs aren't necessary, we 
just
+        * don't want to ACK future packets, that confuses everyone.
+        */
+       if (pkt->seq > 0 && pkt->sessionId == session_param.gp_session_id)
+       {
+               bool            need_ack = false;
+               uint8           ack_flags = 0;
+
+               /*
+                * The QD-backends can't use a counter, they've potentially got
+                * multiple instances (one for each active cursor)
+                */
+               if (global_param.Gp_role == GP_ROLE_DISPATCH_IC)
+               {
+                       struct CursorICHistoryEntry *p;
+
+                       p = rx_control_info.cursorHistoryTable.get(pkt->icId);
+                       if (p)
+                       {
+                               if (p->status == 0)
+                               {
+                                       /* Torn down. Ack the past. */
+                                       need_ack = true;
+                               }
+                               else                    /* p->status == 1 */
+                               {
+                                       /*
+                                        * Not torn down yet. It happens when 
an error
+                                        * (out-of-memory, network error...) 
occurred after the
+                                        * cursor entry is inserted into the 
table in interconnect
+                                        * setup process. The peer will be 
canceled.
+                                        */
+                                       if (IC_DEBUG1 >= 
session_param.log_min_messages)
+                                               LOG(DEBUG1, "GOT A MISMATCH 
PACKET WITH ID %d HISTORY THINKS IT IS ACTIVE", pkt->icId);
+                                       return cached;  /* ignore, no ack */
+                               }
+                       }
+                       else
+                       {
+                               if (IC_DEBUG1 >= session_param.log_min_messages)
+                                       LOG(DEBUG1, "GOT A MISMATCH PACKET WITH 
ID %d HISTORY HAS NO RECORD", pkt->icId);
+
+                               /*
+                                * No record means that two possibilities. 1) 
It is from the
+                                * future. It is due to startup race. We do not 
ack future
+                                * packets 2) Before the entry for the ic 
instance is
+                                * inserted, an error happened. We do not ack 
for this case
+                                * too. The peer will be canceled.
+                                */
+                               ack_flags = UDPIC_FLAGS_NAK;
+                               need_ack = false;
+
+                               if 
(session_param.gp_interconnect_cache_future_packets)
+                               {
+                                       cached = cacheFuturePacket(pkt, peer, 
peer_len);
+                               }
+                       }
+               }
+               /* The QEs get to use a simple counter. */
+               else if (global_param.Gp_role == GP_ROLE_EXECUTE_IC)
+               {
+                       if (ic_control_info.ic_instance_id >= pkt->icId)
+                       {
+                               need_ack = true;
+
+                               /*
+                                * We want to "ACK the past, but NAK the 
future."
+                                *
+                                * handleAck() will retransmit.
+                                */
+                               if (pkt->seq >= 1 && pkt->icId > 
rx_control_info.lastTornIcId)
+                               {
+                                       ack_flags = UDPIC_FLAGS_NAK;
+                                       need_ack = false;
+                               }
+                       }
+                       else
+                       {
+                               /*
+                                * ic_control_info.ic_instance_id < pkt->icId, 
from the future
+                                */ 
+                               if 
(session_param.gp_interconnect_cache_future_packets)
+                               {
+                                       cached = cacheFuturePacket(pkt, peer, 
peer_len);
+                               }
+                       }
+               }
+
+               if (need_ack)
+               {
+                       UDPConn dummyconn(NULL);
+                       char            buf[128];       /* numeric IP addresses 
shouldn't exceed
+                                                                        * 
about 50 chars, but play it safe */
+
+                       memcpy(&dummyconn.conn_info, pkt, sizeof(icpkthdr));
+                       dummyconn.peer = *peer;
+                       dummyconn.peer_len = peer_len;
+
+                       dummyconn.conn_info.flags |= ack_flags;
+
+                       if (IC_DEBUG1 >= session_param.log_min_messages)
+                               LOG(DEBUG1, "ACKING PACKET WITH FLAGS: pkt->seq 
%d 0x%x [pkt->icId %d last-teardown %d interconnect_id %d]",
+                                                       pkt->seq, 
dummyconn.conn_info.flags, pkt->icId, rx_control_info.lastTornIcId, 
ic_control_info.ic_instance_id);
+
+                       format_sockaddr_udp(&dummyconn.peer, buf, sizeof(buf));
+
+                       if (IC_DEBUG1 >= session_param.log_min_messages)
+                               LOG(DEBUG1, "ACKING PACKET TO %s", buf);
+
+                       if ((ack_flags & UDPIC_FLAGS_NAK) == 0)
+                       {
+                               ack_flags |= UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK 
| UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_RECEIVER_TO_SENDER;
+                       }
+                       else
+                       {
+                               ack_flags |= UDPIC_FLAGS_RECEIVER_TO_SENDER;
+                       }
+
+                       /*
+                        * There are two cases, we may need to send a response 
to sender
+                        * here. One is start race and the other is receiver 
becomes idle.
+                        *
+                        * ack_flags here can take two possible values 1) 
UDPIC_FLAGS_NAK
+                        * | UDPIC_FLAGS_RECEIVER_TO_SENDER (for start race) 2)
+                        * UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | 
UDPIC_FLAGS_CAPACITY |
+                        * UDPIC_FLAGS_RECEIVER_TO_SENDER (for idle receiver)
+                        *
+                        * The final flags in the packet may take some extra 
bits such as
+                        * 1) UDPIC_FLAGS_STOP 2) UDPIC_FLAGS_EOS 3) 
UDPIC_FLAGS_CAPACITY
+                        * which are from original packet
+                        */
+                       dummyconn.sendAck(ack_flags | 
dummyconn.conn_info.flags, dummyconn.conn_info.seq, dummyconn.conn_info.seq);
+               }
+       }
+       else
+       {
+               if (IC_DEBUG1 >= session_param.log_min_messages)
+                       LOG(DEBUG1, "dropping packet from command-id %d seq %d 
(my cmd %d)", pkt->icId, pkt->seq, ic_control_info.ic_instance_id);
+       }
+
+       return cached;
+}
+
+/*
+ * cacheFuturePacket
+ *             Cache the future packets during the setupUDPIFCInterconnect.
+ *
+ * Return true if packet is cached, otherwise false
+ */
+static bool
+cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, int peer_len)
+{
+       UDPConn *conn = ic_control_info.startupCacheHtab.find(pkt);
+       if (conn == NULL)
+       {
+               try {
+                       conn = new UDPConn(NULL);
+               } catch (const std::bad_alloc & e) {
+                       errno = ENOMEM;
+                       setRxThreadError(errno);
+                       return false;
+               }
+
+               memset((void *) conn, 0, sizeof(UDPConn));
+               memcpy(&conn->conn_info, pkt, sizeof(icpkthdr));
+
+               conn->pkt_q_capacity = 
session_param.Gp_interconnect_queue_depth;
+               conn->pkt_q_size = session_param.Gp_interconnect_queue_depth;
+               conn->pkt_q = (uint8 **) 
ic_malloc(session_param.Gp_interconnect_queue_depth * sizeof(uint8 *));
+
+               if (conn->pkt_q == NULL)
+               {
+                       /* malloc failed.  */
+                       delete conn;
+                       setRxThreadError(errno);
+                       return false;
+               }
+
+               /* We only use the array to store cached packets. */
+               memset(conn->pkt_q, 0, 
session_param.Gp_interconnect_queue_depth * sizeof(uint8 *));
+
+               /* Put connection to the hashtable. */
+               if (!ic_control_info.startupCacheHtab.add(conn))
+               {
+                       ic_free(conn->pkt_q);
+                       delete conn;
+                       setRxThreadError(errno);
+                       return false;
+               }
+
+               /* Setup the peer sock information. */
+               memcpy(&conn->peer, peer, peer_len);
+               conn->peer_len = peer_len;
+       }
+
+       /*
+        * Reject packets with invalid sequence numbers and packets which have
+        * been cached before.
+        */
+       if (pkt->seq > conn->pkt_q_size || pkt->seq == 0 || 
conn->pkt_q[pkt->seq - 1] != NULL)
+               return false;
+
+       conn->pkt_q[pkt->seq - 1] = (uint8 *) pkt;
+       rx_buffer_pool.maxCount++;
+       ic_statistics.startupCachedPktNum++;
+
+       return true;
+}
+
+/*
+ * cleanupStartupCache
+ *             Clean the startup cache.
+ */
+static void
+cleanupStartupCache()
+{
+       ConnHtabBin *bin = NULL;
+       UDPConn *cachedConn = NULL;
+       icpkthdr   *pkt = NULL;
+       int                     i = 0;
+       uint32          j = 0;
+
+       for (i = 0; i < ic_control_info.startupCacheHtab.size; i++)
+       {
+               bin = ic_control_info.startupCacheHtab.table[i];
+
+               while (bin)
+               {
+                       cachedConn = bin->conn;
+
+                       for (j = 0; j < cachedConn->pkt_q_size; j++)
+                       {
+                               pkt = (icpkthdr *) cachedConn->pkt_q[j];
+
+                               if (pkt == NULL)
+                                       continue;
+
+                               rx_buffer_pool.maxCount--;
+                               rx_buffer_pool.put(pkt);
+                               cachedConn->pkt_q[j] = NULL;
+                       }
+                       bin = bin->next;
+                       ic_control_info.startupCacheHtab.remove(cachedConn);
+
+                       /*
+                        * MPP-19981 free the cached connections; otherwise 
memory leak
+                        * would be introduced.
+                        */
+                       ic_free(cachedConn->pkt_q);
+                       ic_free(cachedConn);
+               }
+       }
+}
+
+
+#ifdef USE_ASSERT_CHECKING
+
+/* The following functions are facility methods for debugging.
+ * They are quite useful when there are a large number of connections.
+ * These functions can be called from gdb to output internal information to a 
file.
+ */
+
+/*
+ * dumpUnackQueueRing
+ *             Dump an unack queue ring.
+ */
+static void
+dumpUnackQueueRing(const char *fname)
+{
+       FILE       *ofile = fopen(fname, "w+");
+       int                     i;
+
+       fprintf(ofile, "UnackQueueRing: currentTime %lu, idx %d numOutstanding 
%d numSharedOutstanding %d\n",
+                       unack_queue_ring.currentTime, unack_queue_ring.idx,
+                       unack_queue_ring.numOutStanding, 
unack_queue_ring.numSharedOutStanding);
+       fprintf(ofile, "==================================\n");
+       for (i = 0; i < UNACK_QUEUE_RING_SLOTS_NUM; i++)
+       {
+               if (unack_queue_ring.slots[i].length() > 0)
+               {
+                       unack_queue_ring.slots[i].dump_to_file(ofile);
+               }
+       }
+
+       fclose(ofile);
+}
+
+/*
+ * dumpConnections
+ *             Dump connections.
+ */
+void
+TransportEntry::dumpConnections(const char *fname)
+{
+       int                     i;
+       uint32          j;
+
+       return;
+
+       FILE       *ofile = fopen(fname, "w+");
+
+       fprintf(ofile, "Entry connections: conn num %d \n", this->numConns);
+       fprintf(ofile, "==================================\n");
+       for (i = 0; i < this->numConns; i++)
+       {
+               UDPConn *conn = this->GetConn(i);
+
+               fprintf(ofile, "conns[%d] motNodeId=%d: remoteContentId=%d 
pid=%d sockfd=%d remote=%s "
+                               "capacity=%d sentSeq=%d receivedAckSeq=%d 
consumedSeq=%d rtt=%lu"
+                               " dev=%lu deadlockCheckBeginTime=%lu route=%d 
msgSize=%d msgPos=%p"
+                               " recvBytes=%d tupleCount=%d stillActive=%d 
stopRequested=%d "
+                               "state=%d\n",
+                               i, this->motNodeId,
+                               conn->remoteContentId,
+                               conn->cdbProc ? conn->cdbProc->pid : 0,
+                               conn->sockfd,
+                               conn->remoteHostAndPort,
+                               conn->capacity, conn->sentSeq, 
conn->receivedAckSeq, conn->consumedSeq,
+                               conn->rtt, conn->dev, 
conn->deadlockCheckBeginTime, conn->route, conn->msgSize, conn->msgPos,
+                               conn->recvBytes, conn->tupleCount, 
conn->stillActive, conn->stopRequested,
+                               conn->state);
+               fprintf(ofile, "conn_info [%s: seq %d extraSeq %d]: motNodeId 
%d, crc %d len %d "
+                               "srcContentId %d dstDesContentId %d "
+                               "srcPid %d dstPid %d "
+                               "srcListenerPort %d dstListernerPort %d "
+                               "sendSliceIndex %d recvSliceIndex %d "
+                               "sessionId %d icId %d "
+                               "flags %d\n",
+                               conn->conn_info.flags & 
UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
+                               conn->conn_info.seq, conn->conn_info.extraSeq, 
conn->conn_info.motNodeId, conn->conn_info.crc, conn->conn_info.len,
+                               conn->conn_info.srcContentId, 
conn->conn_info.dstContentId,
+                               conn->conn_info.srcPid, conn->conn_info.dstPid,
+                               conn->conn_info.srcListenerPort, 
conn->conn_info.dstListenerPort,
+                               conn->conn_info.sendSliceIndex, 
conn->conn_info.recvSliceIndex,
+                               conn->conn_info.sessionId, conn->conn_info.icId,
+                               conn->conn_info.flags);
+
+               if (!ic_control_info.isSender)
+               {
+                       fprintf(ofile, "pkt_q_size=%d pkt_q_head=%d 
pkt_q_tail=%d pkt_q=%p\n", conn->pkt_q_size, conn->pkt_q_head, 
conn->pkt_q_tail, conn->pkt_q);
+                       for (j = 0; j < conn->pkt_q_capacity; j++)
+                       {
+                               if (conn->pkt_q != NULL && conn->pkt_q[j] != 
NULL)
+                               {
+                                       icpkthdr   *pkt = (icpkthdr *) 
conn->pkt_q[j];
+
+                                       fprintf(ofile, "Packet (pos %d) Info 
[%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
+                                                       "srcContentId %d 
dstDesContentId %d "
+                                                       "srcPid %d dstPid %d "
+                                                       "srcListenerPort %d 
dstListernerPort %d "
+                                                       "sendSliceIndex %d 
recvSliceIndex %d "
+                                                       "sessionId %d icId %d "
+                                                       "flags %d\n",
+                                                       j,
+                                                       pkt->flags & 
UDPIC_FLAGS_RECEIVER_TO_SENDER ? "ACK" : "DATA",
+                                                       pkt->seq, 
pkt->extraSeq, pkt->motNodeId, pkt->crc, pkt->len,
+                                                       pkt->srcContentId, 
pkt->dstContentId,
+                                                       pkt->srcPid, 
pkt->dstPid,
+                                                       pkt->srcListenerPort, 
pkt->dstListenerPort,
+                                                       pkt->sendSliceIndex, 
pkt->recvSliceIndex,
+                                                       pkt->sessionId, 
pkt->icId,
+                                                       pkt->flags);
+                               }
+                       }
+               }
+               if (ic_control_info.isSender)
+               {
+                       fprintf(ofile, "sndQueue ");
+                       conn->sndQueue.dump_to_file(ofile);
+                       fprintf(ofile, "unackQueue ");
+                       conn->unackQueue.dump_to_file(ofile);
+
+                       dumpUnackQueueRing("/tmp/dumpUnackQueueRing");
+               }
+               fprintf(ofile, "\n");
+       }
+       fclose(ofile);
+}
+#endif
+
+/*
+ * logPkt
+ *             Log a packet.
+ *
+ */
+static inline void
+logPkt(const char *prefix, icpkthdr *pkt)
+{
+       LOG(INFO, "%s [%s: seq %d extraSeq %d]: motNodeId %d, crc %d len %d "
+                         "srcContentId %d dstDesContentId %d "
+                         "srcPid %d dstPid %d "
+                         "srcListenerPort %d dstListernerPort %d "
+                         "sendSliceIndex %d recvSliceIndex %d "
+                         "sessionId %d icId %d "
+                         "flags %d ",
+                         prefix, pkt->flags & UDPIC_FLAGS_RECEIVER_TO_SENDER ? 
"ACK" : "DATA",
+                         pkt->seq, pkt->extraSeq, pkt->motNodeId, pkt->crc, 
pkt->len,
+                         pkt->srcContentId, pkt->dstContentId,
+                         pkt->srcPid, pkt->dstPid,
+                         pkt->srcListenerPort, pkt->dstListenerPort,
+                         pkt->sendSliceIndex, pkt->recvSliceIndex,
+                         pkt->sessionId, pkt->icId,
+                         pkt->flags);
+}
+
+/*
+ * Send a dummy packet to interconnect thread to exit poll() immediately
+ */
+static void
+SendDummyPacket(void)
+{
+   int                 ret;
+   const char          *dummy_pkt = "stop it";
+   int                 counter;
+   struct sockaddr_storage dest;
+   socklen_t   dest_len;
+
+   Assert(udp_dummy_packet_sockaddr.ss_family == AF_INET || 
udp_dummy_packet_sockaddr.ss_family == AF_INET6);
+   Assert(ICSenderFamily == AF_INET || ICSenderFamily == AF_INET6);
+
+   dest = udp_dummy_packet_sockaddr;
+   dest_len = (ICSenderFamily == AF_INET) ? sizeof(struct sockaddr_in) : 
sizeof(struct sockaddr_in6);
+
+   if (ICSenderFamily == AF_INET6)
+   {
+#if defined(__darwin__)
+       if (udp_dummy_packet_sockaddr.ss_family == AF_INET6)
+           ConvertIPv6WildcardToLoopback(&dest);
+#endif
+       if (udp_dummy_packet_sockaddr.ss_family == AF_INET)
+           ConvertToIPv4MappedAddr(&dest, &dest_len);
+   }
+
+   if (ICSenderFamily == AF_INET && udp_dummy_packet_sockaddr.ss_family == 
AF_INET6)
+   {
+       /* the size of AF_INET6 is bigger than the side of IPv4, so
+        * converting from IPv6 to IPv4 may potentially not work. */
+       LOG(INFO, "sending dummy packet failed: cannot send from AF_INET to 
receiving on AF_INET6");
+       return;
+   }
+
+       /*
+     * Send a dummy package to the interconnect listener, try 10 times.
+     * We don't want to close the socket at the end of this function, since
+     * the socket will eventually close during the motion layer cleanup.
+        */
+
+       counter = 0;
+       while (counter < 10)
+       {
+               counter++;
+               ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, 
(struct sockaddr *) &dest, dest_len);
+               if (ret < 0)
+               {
+                       if (errno == EINTR || errno == EAGAIN || errno == 
EWOULDBLOCK)
+                               continue;
+                       else
+                       {
+                               LOG(INFO, "send dummy packet failed, sendto 
failed: %m");
+                               return;
+                       }
+               }
+               break;
+       }
+
+       if (counter >= 10)
+       {
+               LOG(INFO, "send dummy packet failed, sendto failed with 10 
times: %m");
+       }
+}
+
+/*
+ * prepareXmit
+ *             Prepare connection for transmit.
+ */
+void
+UDPConn::prepareXmit()
+{
+       this->conn_info.len = this->msgSize;
+       this->conn_info.crc = 0;
+
+       memcpy(this->pBuff, &this->conn_info, sizeof(this->conn_info));
+
+       /* increase the sequence no */
+       this->conn_info.seq++;
+
+       if (session_param.gp_interconnect_full_crc)
+       {
+               icpkthdr *pkt = (icpkthdr *)this->pBuff;
+               addCRC(pkt);
+       }
+}
+
+ /*
+  * sendtoWithRetry
+  *         Retry sendto logic and send the packets.
+  */
+static ssize_t
+sendtoWithRetry(int socket, const void *message, size_t length,
+           int flags, const struct sockaddr *dest_addr,
+           socklen_t dest_len, int retry, const char *errDetail)
+{
+    int32       n;
+    int count = 0;
+
+xmit_retry:
+    /*
+     * If given retry count is positive, retry up to the limited times.
+     * Otherwise, retry for unlimited times until succeed.
+     */
+    if (retry > 0 && ++count > retry)
+       return n;
+       n = sendto(socket, message, length, flags, dest_addr, dest_len);
+    if (n < 0)
+    {
+               int                     save_errno = errno;
+
+               if (errno == EINTR)
+                       goto xmit_retry;
+
+        /*
+         * EAGAIN: no space ? not an error.
+         *
+         * EFAULT: In Linux system call, it only happens when copying a socket
+         * address into kernel space failed, which is less likely to happen,
+         * but mocked heavily by our fault injection in regression tests.
+         */
+        if (errno == EAGAIN || errno == EFAULT)
+            return n;
+
+               /*
+                * If Linux iptables (nf_conntrack?) drops an outgoing packet, 
it may
+                * return an EPERM to the application. This might be simply 
because of
+                * traffic shaping or congestion, so ignore it.
+                */
+               if (errno == EPERM)
+               {
+                       LOG(LOG_ERROR, "Interconnect error writing an outgoing 
packet: %m, "
+                                                  "error during sendto() %s", 
errDetail);
+                       return n;
+               }
+
+               /*
+                * If the OS can detect an MTU issue on the host network 
interfaces, we
+                * would get EMSGSIZE here. So, bail with a HINT about checking 
MTU.
+                */
+               if (errno == EMSGSIZE)
+               {
+                       std::stringstream ss;
+                       ss << "ERROR, Interconnect error writing an outgoing 
packet: " << strerror(errno) << "error during sendto() call (error:" << 
save_errno << ", " << errDetail << ")."
+                          << "check if interface MTU is equal across the 
cluster and lower than gp_max_packet_size" << "\n";
+                       throw ICNetworkException(ss.str(), __FILE__, __LINE__);
+               }
+
+               std::stringstream ss;
+               ss <<"ERROR, Interconnect error writing an outgoing packet: 
"<<strerror(errno)<<
+                       "error during sendto() call (error:"<<save_errno<<", 
"<<errDetail << ").\n";
+               throw ICNetworkException(ss.str(), __FILE__, __LINE__);
+               /* not reached */
+       }
+
+       return n;
+}
+
+/*
+ * sendOnce
+ *             Send a packet.
+ */
+void
+UDPConn::sendOnce(icpkthdr *pkt)
+{
+       int32 n;
+#ifdef USE_ASSERT_CHECKING
+       if (testmode_inject_fault(session_param.gp_udpic_dropxmit_percent))
+       {
+#ifdef AMS_VERBOSE_LOGGING
+               LOG(INFO, "THROW PKT with seq %d srcpid %d despid %d",
+                                 pkt->seq, pkt->srcPid, pkt->dstPid);
+#endif
+               return;
+       }
+#endif
+
+       Assert(pkt->srcContentId == global_param.segindex);
+       Assert(pkt->motNodeId == entry_->motNodeId);
+       LOG(DEBUG3, "UDPConn::sendOnce(): icid: %d, motNodeId: %d, srcSeg: %d, 
dstSeg: %d, srcPid: %d, dstPid: %d, seq: %d, len: %d, flags: %s",
+                       pkt->icId, pkt->motNodeId, pkt->srcContentId, 
pkt->dstContentId, pkt->srcPid, pkt->dstPid, pkt->seq, pkt->len, 
flags2txt(pkt->flags));
+
+    char errDetail[256];
+    snprintf(errDetail, sizeof(errDetail), "For Remote Connection: 
contentId=%d at %s",
+                      this->remoteContentId,
+                      this->remoteHostAndPort);
+    n = sendtoWithRetry(this->entry_->txfd, pkt, pkt->len, 0,
+                           (struct sockaddr *) &this->peer, this->peer_len, 
-1, errDetail);
+       if (n != int(pkt->len))
+       {
+               if (IC_DEBUG1 >= session_param.log_min_messages)
+                       LOG(DEBUG1, "Interconnect error writing an outgoing 
packet [seq %d]: short transmit (given %d sent %d) during sendto() call."
+                                               "For Remote Connection: 
contentId=%d at %s", pkt->seq, pkt->len, n,
+                                               this->remoteContentId, 
this->remoteHostAndPort);
+#ifdef AMS_VERBOSE_LOGGING
+               logPkt("PKT DETAILS ", pkt);
+#endif
+       }
+       return;
+}
+
+void
+UDPConn::handleStop()
+{
+       if (!this->stillActive || !this->stopRequested)
+               return;
+
+       /* mark buffer empty */
+       this->tupleCount = 0;
+       this->msgSize = sizeof(this->conn_info);
+
+       /* now send our stop-ack EOS */
+       this->conn_info.flags |= UDPIC_FLAGS_EOS;
+
+       Assert(this->curBuff != NULL);
+
+       this->pBuff[this->msgSize] = 'S';
+       this->msgSize += 1;
+
+       /* now ready to actually send */
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+               LOG(DEBUG1, "handleStopMsgs: node %d route %d, seq %d",
+                                       entry_->motNodeId, this->route, 
this->conn_info.seq);
+
+       /* place it into the send queue */
+       this->prepareXmit();
+       this->sndQueue.append(this->curBuff);
+       this->curBuff = NULL;
+       this->pBuff = NULL;
+
+       /* return all buffers */
+       this->sndQueue.release(false);
+       this->unackQueue.release(session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_CAPACITY_IC ? false : true);
+
+       this->tupleCount = 0;
+       this->msgSize = sizeof(this->conn_info);
+
+       this->state = mcsEosSent;
+       this->stillActive = false;
+       this->stopRequested = false;
+}
+
+/*
+ * sendBuffers
+ *             Called by sender to send the buffers in the send queue.
+ *
+ * Send the buffers in the send queue of the connection if there is capacity 
left
+ * and the congestion control condition is satisfied.
+ *
+ * Here, we make sure that a connection can have at least one outstanding 
buffer.
+ * This is very important for two reasons:
+ *
+ * 1) The handling logic of the ack of the outstanding buffer can always send 
a buffer
+ *    in the send queue. Otherwise, there may be a deadlock.
+ * 2) This makes sure that any connection can have a minimum bandwidth for data
+ *    sending.
+ *
+ * After sending a buffer, the buffer will be placed into both the unack queue 
and
+ * the corresponding queue in the unack queue ring.
+ */
+void
+UDPConn::sendBuffers()
+{
+       while (this->capacity > 0 && this->sndQueue.length() > 0)
+       {
+               ICBuffer   *buf = NULL;
+
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+               {
+                       if (this->unackQueue.length() > 0 &&
+                               unack_queue_ring.numSharedOutStanding >= 
(snd_control_info.cwnd - snd_control_info.minCwnd))
+                       break;
+               }
+
+               /* for connection setup, we only allow one outstanding packet. 
*/
+               if (this->state == mcsSetupOutgoingConnection && 
this->unackQueue.length() >= 1)
+                       break;
+
+               buf = this->sndQueue.pop();
+
+               uint64          now = getCurrentTime();
+
+               buf->sentTime = now;
+               buf->unackQueueRingSlot = -1;
+               buf->nRetry = 0;
+               buf->conn = this;
+               this->capacity--;
+
+               this->unackQueue.append(buf);
+
+               if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+               {
+                       unack_queue_ring.numOutStanding++;
+                       if (this->unackQueue.length() > 1)
+                               unack_queue_ring.numSharedOutStanding++;
+
+                       putIntoUnackQueueRing(&unack_queue_ring,
+                                                                 buf,
+                                                                 
this->computeExpirationPeriod(buf->nRetry),
+                                                                 now);
+               }
+
+               /*
+                * Note the place of sendOnce here. If we send before appending 
it to
+                * the unack queue and putting it into unack queue ring, and 
there is
+                * a network error occurred in the sendOnce function, error 
message
+                * will be output. In the time of error message output, 
interrupts is
+                * potentially checked, if there is a pending query cancel, it 
will
+                * lead to a dangled buffer (memory leak).
+                */
+#ifdef TRANSFER_PROTOCOL_STATS
+               trans_proto_stats.update(TPE_DATA_PKT_SEND, buf->pkt);
+#endif
+
+               this->sendOnce(buf->pkt);
+
+               ic_statistics.sndPktNum++;
+
+#ifdef AMS_VERBOSE_LOGGING
+               logPkt("SEND PKT DETAIL", buf->pkt);
+#endif
+
+               this->sentSeq = buf->pkt->seq;
+       }
+}
+
+/*
+ * handleDisorderPacket
+ *             Called by rx thread to assemble and send a disorder message.
+ *
+ * In current implementation, we limit the number of lost packet sequence 
numbers
+ * in the disorder message by the MIN_PACKET_SIZE. There are two reasons here:
+ *
+ * 1) The maximal number of lost packet sequence numbers are actually bounded 
by the
+ *    receive queue depth whose maximal value is very large. Since we share 
the packet
+ *    receive and ack receive in the background thread, the size of disorder 
should be
+ *    also limited by the max packet size.
+ * 2) We can use Gp_max_packet_size here to limit the number of lost packet 
sequence numbers.
+ *    But considering we do not want to let senders send many packets when 
getting a lost
+ *    message. Here we use MIN_PACKET_SIZE.
+ *
+ *
+ * the format of a disorder message:
+ * I) pkt header
+ *  - seq      -> packet sequence number that triggers the disorder message
+ *  - extraSeq -> the largest seq of the received packets
+ *  - flags    -> UDPIC_FLAGS_DISORDER
+ *  - len      -> sizeof(icpkthdr) + sizeof(uint32) * (lost pkt count)
+ * II) content
+ *  - an array of lost pkt sequence numbers (uint32)
+ *
+ */
+void
+UDPConn::handleDisorderPacket(int pos, uint32 tailSeq, icpkthdr *pkt)
+{
+       int                     start = 0;
+       uint32          lostPktCnt = 0;
+       uint32     *curSeq = (uint32 *) &rx_control_info.disorderBuffer[1];
+       uint32          maxSeqs = MAX_SEQS_IN_DISORDER_ACK;
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "PROCESS_DISORDER PKT BEGIN:");
+#endif
+
+       start = this->pkt_q_tail;
+
+       while (start != pos && lostPktCnt < maxSeqs)
+       {
+               if (this->pkt_q[start] == NULL)
+               {
+                       *curSeq = tailSeq;
+                       lostPktCnt++;
+                       curSeq++;
+
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "PROCESS_DISORDER add seq [%d], lostPktCnt 
%d", *curSeq, lostPktCnt);
+#endif
+               }
+
+               tailSeq++;
+               start = (start + 1) % this->pkt_q_capacity;
+       }
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "PROCESS_DISORDER PKT END:");
+#endif
+
+       /* when reaching here, cnt must not be 0 */
+       this->sendDisorderAck(pkt->seq, this->conn_info.seq - 1, lostPktCnt);
+}
+
+/*
+ * handleAckForDisorderPkt
+ *             Called by sender to deal with acks for disorder packet.
+ */
+bool
+UDPConn::handleAckForDisorderPkt(icpkthdr *pkt)
+{
+       ICBufferLink *link = NULL;
+       ICBuffer   *buf = NULL;
+       ICBufferLink *next = NULL;
+       uint64          now = getCurrentTime();
+       uint32     *curLostPktSeq = 0;
+       int                     lostPktCnt = 0;
+       static uint32 times = 0;
+       static uint32 lastSeq = 0;
+       bool            shouldSendBuffers = false;
+
+       if (pkt->extraSeq != lastSeq)
+       {
+               lastSeq = pkt->extraSeq;
+               times = 0;
+               return false;
+       }
+       else
+       {
+               times++;
+               if (times != 2)
+                       return false;
+       }
+
+       curLostPktSeq = (uint32 *) &pkt[1];
+       lostPktCnt = (pkt->len - sizeof(icpkthdr)) / sizeof(uint32);
+
+       /*
+        * Resend all the missed packets and remove received packets from queues
+        */
+
+       link = this->unackQueue.first();
+       buf = GET_ICBUFFER_FROM_PRIMARY(link);
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "DISORDER: pktlen %d cnt %d pktseq %d first loss %d buf %p",
+                         pkt->len, lostPktCnt, pkt->seq, *curLostPktSeq, buf);
+
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+       {
+               this->unackQueue.icBufferListLog();
+               this->sndQueue.icBufferListLog();
+       }
+#endif
+
+       /*
+        * iterate the unack queue
+        */
+       while (!this->unackQueue.is_head(link) && buf->pkt->seq <= pkt->seq && 
lostPktCnt > 0)
+       {
+#ifdef AMS_VERBOSE_LOGGING
+               LOG(INFO, "DISORDER: bufseq %d curlostpkt %d cnt %d buf %p 
pkt->seq %d",
+                                 buf->pkt->seq, *curLostPktSeq, lostPktCnt, 
buf, pkt->seq);
+#endif
+
+               if (buf->pkt->seq == pkt->seq)
+               {
+                       this->handleAckedPacket(buf, now);
+                       shouldSendBuffers = true;
+                       break;
+               }
+
+               if (buf->pkt->seq == *curLostPktSeq)
+               {
+                       /* this is a lost packet, retransmit */
+
+                       buf->nRetry++;
+                       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+                       {
+                               ICBufferList *alist = 
&unack_queue_ring.slots[buf->unackQueueRingSlot];
+                               buf = alist->remove(buf);
+                               putIntoUnackQueueRing(&unack_queue_ring, buf,
+                                                                         
this->computeExpirationPeriod(buf->nRetry), now);
+                       }
+#ifdef TRANSFER_PROTOCOL_STATS
+                       trans_proto_stats.update(TPE_DATA_PKT_SEND, buf->pkt);
+#endif
+
+                       Assert(this == buf->conn);
+                       this->sendOnce(buf->pkt);
+
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "RESEND a buffer for DISORDER: seq %d", 
buf->pkt->seq);
+                       logPkt("DISORDER RESEND DETAIL ", buf->pkt);
+#endif
+
+                       ic_statistics.retransmits++;
+                       curLostPktSeq++;
+                       lostPktCnt--;
+
+                       link = link->next;
+                       buf = GET_ICBUFFER_FROM_PRIMARY(link);
+               }
+               else if (buf->pkt->seq < *curLostPktSeq)
+               {
+                       /* remove packet already received. */
+
+                       next = link->next;
+                       this->handleAckedPacket(buf, now);
+                       shouldSendBuffers = true;
+                       link = next;
+                       buf = GET_ICBUFFER_FROM_PRIMARY(link);
+               }
+               else                                    /* buf->pkt->seq > 
*curPktSeq */
+               {
+                       /*
+                        * this case is introduced when the disorder message 
tell you a
+                        * pkt is lost. But when we handle this message, a 
message (for
+                        * example, duplicate ack, or another disorder message) 
arriving
+                        * before this message already removed the pkt.
+                        */
+                       curLostPktSeq++;
+                       lostPktCnt--;
+               }
+       }
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+       {
+               snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, 
snd_control_info.minCwnd);
+               snd_control_info.cwnd = snd_control_info.ssthresh;
+       }
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "After DISORDER: sndQ %d unackQ %d", this->sndQueue.length(), 
this->unackQueue.length());
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC)
+       {
+               this->unackQueue.icBufferListLog();
+               this->sndQueue.icBufferListLog();
+       }
+#endif
+
+       return shouldSendBuffers;
+}
+
+/*
+ * handleAckForDuplicatePkt
+ *             Called by sender to deal with acks for duplicate packet.
+ *
+ */
+bool
+UDPConn::handleAckForDuplicatePkt(icpkthdr *pkt)
+{
+       ICBufferLink *link = NULL;
+       ICBuffer   *buf = NULL;
+       ICBufferLink *next = NULL;
+       uint64          now = getCurrentTime();
+       bool            shouldSendBuffers = false;
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(INFO, "RESEND the unacked buffers in the queue due to %s", pkt->len 
== 0 ? "PROCESS_START_RACE" : "DISORDER");
+#endif
+
+       if (pkt->seq <= pkt->extraSeq)
+       {
+               /* Indicate a bug here. */
+               LOG(LOG_ERROR, "invalid duplicate message: seq %d extraSeq %d", 
pkt->seq, pkt->extraSeq);
+               return false;
+       }
+
+       link = this->unackQueue.first();
+       buf = GET_ICBUFFER_FROM_PRIMARY(link);
+
+       /* deal with continuous pkts */
+       while (!this->unackQueue.is_head(link) && (buf->pkt->seq <= 
pkt->extraSeq))
+       {
+               next = link->next;
+               this->handleAckedPacket(buf, now);
+               shouldSendBuffers = true;
+               link = next;
+               buf = GET_ICBUFFER_FROM_PRIMARY(link);
+       }
+
+       /* deal with the single duplicate packet */
+       while (!this->unackQueue.is_head(link) && buf->pkt->seq <= pkt->seq)
+       {
+               next = link->next;
+               if (buf->pkt->seq == pkt->seq)
+               {
+                       this->handleAckedPacket(buf, now);
+                       shouldSendBuffers = true;
+                       break;
+               }
+               link = next;
+               buf = GET_ICBUFFER_FROM_PRIMARY(link);
+       }
+
+       return shouldSendBuffers;
+}
+
+/*
+ * checkNetworkTimeout
+ *             check network timeout case.
+ */
+void
+UDPConn::checkNetworkTimeout(ICBuffer *buf, uint64 now, bool 
*networkTimeoutIsLogged)
+{
+       /*
+        * Using only the time to first sent time to decide timeout is not 
enough,
+        * since there is a possibility the sender process is not scheduled or
+        * blocked by OS for a long time. In this case, only a few times are
+        * tried. Thus, the GUC Gp_interconnect_min_retries_before_timeout is
+        * added here.
+        */
+       if (session_param.gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG_IC &&
+               buf->nRetry % 
session_param.Gp_interconnect_debug_retry_interval == 0)
+       {
+               LOG(INFO, "resending packet (seq %d) to %s (pid %d cid %d) with 
%d retries in %lu seconds",
+                                 buf->pkt->seq, buf->conn->remoteHostAndPort, 
buf->pkt->dstPid, buf->pkt->dstContentId, buf->nRetry,
+                                 (now - buf->sentTime) / 1000 / 1000);
+       }
+
+       if ((buf->nRetry > 
session_param.Gp_interconnect_min_retries_before_timeout) &&
+               (now - buf->sentTime) > ((uint64) 
session_param.Gp_interconnect_transmit_timeout * 1000 * 1000))
+       {
+               std::stringstream ss;
+               ss <<"ERROR, interconnect encountered a network error, please 
check your network"<<
+                       "Failed to send packet (seq "<<buf->pkt->seq<<") to 
"<<buf->conn->remoteHostAndPort<<
+                       " (pid "<<buf->pkt->dstPid<<" cid 
"<<buf->pkt->dstContentId<<") after "<<buf->nRetry<<
+                       " retries in 
"<<session_param.Gp_interconnect_transmit_timeout<<" seconds.";
+               throw ICNetworkException(ss.str(), __FILE__, __LINE__);
+       }
+
+       /*
+        * Default value of Gp_interconnect_transmit_timeout is one hours.
+        * It taks too long time to detect a network error and it is not user 
friendly.
+        *
+        * Packets would be dropped repeatly on some specific ports. We'd 
better have
+        * a warning messgage for this case and give the DBA a chance to detect 
this error
+        * earlier. Since packets would also be dropped when network is bad, we 
should not
+        * error out here, but just give a warning message. Erroring our is 
still handled
+        * by GUC Gp_interconnect_transmit_timeout as above. Note that warning 
message should
+        * be printed for each statement only once.
+        */
+       if ((buf->nRetry >= 
session_param.Gp_interconnect_min_retries_before_timeout) && 
!(*networkTimeoutIsLogged))
+       {
+               LOG(WARNING, "interconnect may encountered a network error, 
please check your network"
+                                        "Failed to send packet (seq %d) to %s 
(pid %d cid %d) after %d retries.",
+                                        buf->pkt->seq, 
buf->conn->remoteHostAndPort, buf->pkt->dstPid, buf->pkt->dstContentId, 
buf->nRetry);
+               *networkTimeoutIsLogged = true;
+       }
+}
+
+/*
+ * checkExpiration
+ *             Check whether packets expire. If a packet expires, resend the 
packet,
+ *             and adjust its position in the unack queue ring.
+ *
+ */
+void
+UDPConn::checkExpiration(ICChunkTransportState *transportStates, uint64 now)
+{
+       /* check for expiration */
+       int                     count = 0;
+       int                     retransmits = 0;
+       UDPConn *currBuffConn = NULL;
+
+       Assert(unack_queue_ring.currentTime != 0);
+       while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < 
UNACK_QUEUE_RING_SLOTS_NUM)
+       {
+               /* expired, need to resend them */
+               ICBuffer   *curBuf = NULL;
+
+               while (true)
+               {
+                       ICBufferList *alist = 
&unack_queue_ring.slots[unack_queue_ring.idx];
+                       curBuf = alist->pop();
+                       if (curBuf == NULL)
+                               break;
+                       UDPConn *conn = static_cast<UDPConn*>(curBuf->conn);
+                       curBuf->nRetry++;
+                       putIntoUnackQueueRing(&unack_queue_ring,
+                                                                 curBuf,
+                                                                 
conn->computeExpirationPeriod(curBuf->nRetry), now);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+                       trans_proto_stats.update(TPE_DATA_PKT_SEND, 
curBuf->pkt);
+#endif
+
+                       conn->sendOnce(curBuf->pkt);
+
+                       currBuffConn = conn;
+
+                       retransmits++;
+                       ic_statistics.retransmits++;
+                       currBuffConn->stat_count_resent++;
+                       currBuffConn->stat_max_resent = 
Max(currBuffConn->stat_max_resent,
+                                                                               
                currBuffConn->stat_count_resent);
+
+                       UDPConn::checkNetworkTimeout(curBuf, now, 
&transportStates->networkTimeoutIsLogged);
+
+#ifdef AMS_VERBOSE_LOGGING
+                       LOG(INFO, "RESEND pkt with seq %d (retry %d, rtt %lu) 
to route %d",
+                                         curBuf->pkt->seq, curBuf->nRetry, 
currBuffConn->rtt, currBuffConn->route);
+                       logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
+#endif
+               }
+
+               unack_queue_ring.currentTime += TIMER_SPAN;
+               unack_queue_ring.idx = (unack_queue_ring.idx + 1) % 
(UNACK_QUEUE_RING_SLOTS_NUM);
+       }
+
+       /*
+        * deal with case when there is a long time this function is not called.
+        */
+       unack_queue_ring.currentTime = now - (now % TIMER_SPAN);
+       if (retransmits > 0)
+       {
+               snd_control_info.ssthresh = Max(snd_control_info.cwnd / 2, 
snd_control_info.minCwnd);
+               snd_control_info.cwnd = snd_control_info.minCwnd;
+       }
+}
+
+/*
+ * checkDeadlock
+ *             Check whether deadlock occurs on a connection.
+ *
+ * What this function does is to send a status query message to rx thread when
+ * the connection has not received any acks for some time. This is to avoid
+ * potential deadlock when there are continuous ack losses. Packet resending
+ * logic does not help avoiding deadlock here since the packets in the unack
+ * queue may already been removed when the sender knows that they have been
+ * already buffered in the receiver side queue.
+ *
+ * Some considerations on deadlock check time period:
+ *
+ * Potential deadlock occurs rarely. According to our experiments on various
+ * workloads and hardware. It occurred only when fault injection is enabled
+ * and a large number packets and acknowledgments are discarded. Thus, here we
+ * use a relatively large deadlock check period.
+ *
+ */
+void
+UDPConn::checkDeadlock()
+{
+       uint64          deadlockCheckTime;
+
+       if (this->unackQueue.length() == 0 && this->capacity == 0 && 
this->sndQueue.length() > 0)
+       {
+               /* we must have received some acks before deadlock occurs. */
+               Assert(this->deadlockCheckBeginTime > 0);
+
+#ifdef USE_ASSERT_CHECKING
+               if (udp_testmode)
+               {
+                       deadlockCheckTime = 100000;
+               }
+               else
+#endif
+               {
+                       deadlockCheckTime = DEADLOCK_CHECKING_TIME;
+               }
+
+               uint64          now = getCurrentTime();
+
+               /* request the capacity to avoid the deadlock case */
+               if (((now - ic_control_info.lastDeadlockCheckTime) > 
deadlockCheckTime) &&
+                       ((now - this->deadlockCheckBeginTime) > 
deadlockCheckTime))
+               {
+                       this->sendStatusQueryMessage(this->conn_info.seq - 1);
+                       ic_control_info.lastDeadlockCheckTime = now;
+                       ic_statistics.statusQueryMsgNum++;
+
+                       /* check network error. */
+                       if ((now - this->deadlockCheckBeginTime) > ((uint64) 
session_param.Gp_interconnect_transmit_timeout * 1000 * 1000))
+                       {
+                               std::stringstream ss;
+                               ss <<"ERROR, interconnect encountered a network 
error, please check your network"<<
+                                       "Did not get any response from 
"<<remoteHostAndPort<<" (pid "<<conn_info.dstPid<<
+                                       " cid "<<conn_info.dstContentId<<") in 
"<<session_param.Gp_interconnect_transmit_timeout<<" seconds.",
+                               throw ICNetworkException(ss.str(), __FILE__, 
__LINE__);
+                       }
+               }
+       }
+}
+
+/*
+ * updateRetransmitStatistics
+ *             Update the retransmit statistics.
+ */
+void
+UDPConn::updateRetransmitStatistics()
+{
+       ic_statistics.retransmits++;
+       this->stat_count_resent++;
+       this->stat_max_resent = Max(this->stat_max_resent, 
this->stat_count_resent);
+}
+
+/*
+ * checkExpirationCapacityFC
+ *             Check expiration for capacity based flow control method.
+ */
+void
+UDPConn::checkExpirationCapacityFC(int timeout)
+{
+       if (this->unackQueue.length() == 0)
+               return;
+
+       uint64          now = getCurrentTime();
+       uint64          elapsed = now - ic_control_info.lastPacketSendTime;
+
+       if (elapsed >= ((uint64) timeout * 1000))
+       {
+               ICBufferLink *bufLink = this->unackQueue.first();
+               ICBuffer   *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
+
+               Assert(this == buf->conn);
+               this->sendOnce(buf->pkt);
+
+               buf->nRetry++;
+               ic_control_info.lastPacketSendTime = now;
+
+               this->updateRetransmitStatistics();
+               checkNetworkTimeout(buf, now, 
&entry_->state_->networkTimeoutIsLogged);
+       }
+}
+
+/*
+ * checkExceptions
+ *             Check exceptions including packet expiration, deadlock, bg 
thread error, NIC failure...
+ *             Caller should start from 0 with retry, so that the expensive 
check for deadlock and
+ *             QD connection can be avoided in a healthy state.
+ */
+void
+UDPConn::checkExceptions(int retry, int timeout)
+{
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_CAPACITY_IC
+               /* || conn->state == mcsSetupOutgoingConnection */ )
+       {
+               this->checkExpirationCapacityFC(timeout);
+       }
+
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+       {
+               uint64 now = getCurrentTime();
+
+               if (now - ic_control_info.lastExpirationCheckTime > 
uint64(TIMER_CHECKING_PERIOD))
+               {
+                       UDPConn::checkExpiration(this->entry_->state_, now);
+                       ic_control_info.lastExpirationCheckTime = now;
+               }
+       }
+
+       if ((retry & 0x3) == 2)
+       {
+               this->checkDeadlock();
+
+               checkRxThreadError();
+               CHECK_INTERRUPTS(this->entry_->state_);
+       }
+
+       /*
+        * 1. NIC on master (and thus the QD connection) may become bad, check 
it.
+        * 2. Postmaster may become invalid, check it
+        *
+        * We check modulo 2 to correlate with the deadlock check above at the
+        * initial iteration.
+        */
+       if ((retry & 0x3f) == 2)
+       {
+               checkQDConnectionAlive();
+               CHECK_POSTMASTER_ALIVE();
+       }
+}
+
+/*
+ * computeTimeout
+ *             Compute timeout value in ms.
+ */
+int
+UDPConn::computeTimeout(int retry)
+{
+       if (this->unackQueue.length() == 0)
+               return TIMER_CHECKING_PERIOD;
+
+       ICBufferLink *bufLink = this->unackQueue.first();
+       ICBuffer   *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink);
+
+       if (buf->nRetry == 0 && retry == 0)
+               return 0;
+
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_IC)
+               return TIMER_CHECKING_PERIOD;
+
+       /* for capacity based flow control */
+       return TIMEOUT(buf->nRetry);
+}
+
+/*
+ * UDPConn::Send
+ *             is used to send a tcItem to a single destination. Tuples often 
are
+ *             *very small* we aggregate in our local buffer before sending 
into the kernel.
+ *
+ * PARAMETERS
+ *      conn - UDPConn that the tcItem is to be sent to.
+ *      tcItem - message to be sent.
+ *      motionId - Node Motion Id.
+ */
+void
+UDPConn::Send(DataBlock *data)
+{
+       int                     length = data->len;
+       int                     retry = 0;
+       bool            doCheckExpiration = false;
+       bool            gotStops = false;
+
+       Assert(this->msgSize > 0);
+
+#ifdef AMS_VERBOSE_LOGGING
+       LOG(DEBUG3, "UDPConn::Send(): msgSize %d this chunk length %d this seq 
%d",
+                       this->msgSize, data->len, this->conn_info.seq);
+#endif
+
+       if (this->msgSize + length <= global_param.Gp_max_packet_size)
+       {
+               memcpy(this->pBuff + this->msgSize, data->pos, data->len);
+               this->msgSize += length;
+
+               this->tupleCount++;
+               return;
+       }
+
+       /* prepare this for transmit */
+       ic_statistics.totalCapacity += this->capacity;
+       ic_statistics.capacityCountingTime++;
+
+       /* try to send it */
+       this->prepareXmit();
+       this->sndQueue.append(this->curBuff);
+       this->sendBuffers();
+
+       /* get a new buffer */
+       this->curBuff = NULL;
+       this->pBuff = NULL;
+
+       uint64 now = getCurrentTime();
+
+       if (session_param.Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_CAPACITY_IC)
+               doCheckExpiration = false;
+       else
+               doCheckExpiration = (now - 
ic_control_info.lastExpirationCheckTime) > MAX_TIME_NO_TIMER_CHECKING ? true : 
false;
+
+       ic_control_info.lastPacketSendTime = 0;
+       this->deadlockCheckBeginTime = now;
+
+       while (doCheckExpiration || (this->curBuff = snd_buffer_pool.get(this)) 
== NULL)
+       {
+               int timeout = (doCheckExpiration ? 0 : 
this->computeTimeout(retry));
+
+               if (this->entry_->pollAcks(timeout))
+               {
+                       bool rs = this->entry_->handleAcks();
+                       if (rs)
+                       {
+                               /*
+                                * We make sure that we deal with the stop 
messages only after
+                                * we get a buffer. Otherwise, if the stop 
message is not for
+                                * this connection, this will lead to an error 
for the
+                                * following data sending of this connection.
+                                */
+                               gotStops = true;
+                       }
+               }
+               this->checkExceptions(retry++, timeout);
+               doCheckExpiration = false;
+       }
+
+       this->pBuff = (uint8 *) this->curBuff->pkt;
+
+       if (gotStops)
+       {
+               /* handling stop message will make some connection not active 
anymore */
+               this->entry_->handleStopMsgs();
+
+               if (!this->stillActive)
+                       return;
+       }
+
+       /* reinitialize connection */
+       this->tupleCount = 0;
+       this->msgSize = sizeof(this->conn_info);
+
+       /* now we can copy the input to the new buffer */
+       memcpy(this->pBuff + this->msgSize, data->pos, data->len);
+       this->msgSize += length;
+
+       this->tupleCount++;
+}
+
+/*
+ * C++ implement for udp protocol.
+ */
+UDPConn::UDPConn(TransportEntry *entry)
+{
+       /* the field of MotionConn */
+       this->sockfd        = -1;
+       this->pBuff         = nullptr;
+       this->msgSize       = 0;
+       this->msgPos        = nullptr;
+       this->recvBytes     = 0;
+       this->tupleCount    = 0;
+       this->stillActive   = false;
+       this->stopRequested = false;
+       this->cdbProc       = nullptr;
+       this->remoteContentId      = -1;
+       this->remoteHostAndPort[0] = '\0';
+       this->opaque_data = nullptr;
+       this->sent_record_typmod = 0;
+
+       /* the field of UDPConn */
+       this->capacity  = -1;
+       this->sentSeq   = 0;
+       this->receivedAckSeq = 0;
+       this->consumedSeq    = 0;
+       this->rtt = 0;
+       this->dev = 0;
+       this->deadlockCheckBeginTime = -1;
+       this->curBuff  = nullptr;
+       this->route    = 0;
+       this->peer_len = 0;
+       this->pkt_q_capacity = 0;
+       this->pkt_q_size     = 0;
+       this->pkt_q_head     = -1;
+       this->pkt_q_tail     = -1;
+       this->pkt_q          = nullptr;
+       this->entry_ = nullptr;
+       this->stat_total_ack_time = 0;
+       this->stat_count_acks     = 0;
+       this->stat_max_ack_time   = 0;
+       this->stat_min_ack_time   = 0;
+       this->stat_count_resent   = 0;
+       this->stat_max_resent     = 0;
+       this->stat_count_dropped  = 0;
+
+       this->state = mcsNull;
+       this->sockfd = -1;
+       this->msgSize = 0;
+       this->tupleCount = 0;
+       this->stillActive = false;
+       this->stopRequested = false;
+       this->cdbProc = NULL;
+       this->opaque_data = NULL;
+       this->sent_record_typmod = 0;
+
+       /*
+        * "UDPConn dummyconn(NULL)" will be called by handleMismatch() in rx 
thread,
+        * it will lead to the error: "palloc called from thread". So code 
below should
+        * be called in MakeSendEntry() and MakeRecvEntry();
+        * if (global_param.createOpaqueDataCallback)
+        *      this->opaque_data = global_param.createOpaqueDataCallback();
+        */
+
+       this->entry_ = entry;

Review Comment:
   fix in 
[94428f1](https://github.com/apache/cloudberry/pull/1357/commits/94428f12b58a72c537d6c7bd7c79ea8fa42f2877)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to