yjhjstz commented on code in PR #1357: URL: https://github.com/apache/cloudberry/pull/1357#discussion_r2430547887
########## contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp: ########## @@ -0,0 +1,1214 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * ic_udp2_internal.hpp + * + * IDENTIFICATION + * contrib/udp2/ic_common/udp2/ic_udp2_internal.hpp + * + *------------------------------------------------------------------------- + */ +#ifndef IC_UDP2_INTERNAL_HPP +#define IC_UDP2_INTERNAL_HPP + +#include <chrono> +#include <condition_variable> +#include <mutex> +#include <sstream> +#include <stdexcept> +#include <thread> +#include <atomic> +#include <cstdarg> +#include <vector> +#include <map> + +#include <cassert> +#include <cstring> +#include <fcntl.h> +#include <assert.h> +#include <errno.h> +#include <limits.h> +#include <netdb.h> +#include <poll.h> +#include <pthread.h> +#include <signal.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> + +#include "ic_udp2.hpp" +#include "ic_utility.hpp" + +namespace { + +typedef enum MotionConnState +{ + mcsNull, + mcsAccepted, + mcsSetupOutgoingConnection, + mcsConnecting, + mcsRecvRegMsg, + mcsSendRegMsg, + mcsStarted, + mcsEosSent +} MotionConnState; + +/* + * Structure used for keeping track of a pt-to-pt connection between two + * Cdb Entities (either QE or QD). + */ +typedef struct MotionConn +{ + /* socket file descriptor. */ + int sockfd; + + /* pointer to the data buffer. */ + uint8 *pBuff; + + /* size of the message in the buffer, if any. */ + int32 msgSize; + + /* position of message inside of buffer, "cursor" pointer */ + uint8 *msgPos; + + /* + * recv bytes: we can have more than one message/message fragment in recv + * queue at once + */ + int32 recvBytes; + + int tupleCount; + + /* + * false means 1) received a stop message and has handled it. 2) received + * EOS message or sent out EOS message 3) received a QueryFinishPending + * notify and has handled it. + */ + bool stillActive; + + /* + * used both by motion sender and motion receiver + * + * sender: true means receiver don't need to consume tuples any more, + * sender is also responsible to send stop message to its senders. + * + * receiver: true means have sent out a stop message to its senders. The + * stop message might be lost, stopRequested can also tell sender that no + * more data needed in the ack message. + */ + bool stopRequested; + + MotionConnState state; + + ICCdbProcess *cdbProc; + int remoteContentId; + char remoteHostAndPort[128]; /* Numeric IP addresses should never + * be longer than about 50 chars, but + * play it safe */ + + void *opaque_data; + + /* + * used by the sender. + * + * the typmod of last sent record type in current connection, + * if the connection is for broadcasting then we only check + * and update this attribute on connection 0. + */ + int32 sent_record_typmod; + +} MotionConn; + +/* + * Used to organize all of the information for a given motion node. + */ +typedef struct ChunkTransportStateEntry +{ + int motNodeId; + bool valid; + + /* Connection array + * + * MUST pay attention: use getMotionConn to get MotionConn. + * must not use `->conns[index]` to get MotionConn. Because the struct + * MotionConn is a base structure for MotionConnTCP and + * MotionConnUDP. After connection setup, the `conns` will be fill + * with MotionConnUDP/MotionConnTCP, but the pointer still is + * MotionConn which should use `CONTAINER_OF` to get the real object. + */ + MotionConn *conns; + int numConns; + + int scanStart; + + /* slice table entries */ + struct ICExecSlice *sendSlice; + struct ICExecSlice *recvSlice; + +} ChunkTransportStateEntry; + +typedef struct icpkthdr +{ + int32 motNodeId; + + /* + * three pairs which seem useful for identifying packets. + * + * MPP-4194: It turns out that these can cause collisions; but the high + * bit (1<<31) of the dstListener port is now used for disambiguation with + * mirrors. + */ + int32 srcPid; + int32 srcListenerPort; + + int32 dstPid; + int32 dstListenerPort; + + int32 sessionId; + int32 icId; + + int32 recvSliceIndex; + int32 sendSliceIndex; + int32 srcContentId; + int32 dstContentId; + + /* MPP-6042: add CRC field */ + uint32 crc; + + /* packet specific info */ + int32 flags; + uint32 len; + + /* + * The usage of seq and extraSeq field + * a) In a normal DATA packet + * seq -> the data packet sequence number + * extraSeq -> not used + * b) In a normal ACK message (UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY) + * seq -> the largest seq of the continuously cached packets + * sometimes, it is special, for exampke, conn req ack, mismatch ack. + * extraSeq -> the largest seq of the consumed packets + * c) In a start race NAK message (UPDIC_FLAGS_NAK) + * seq -> the seq from the pkt + * extraSeq -> the extraSeq from the pkt + * d) In a DISORDER message (UDPIC_FLAGS_DISORDER) + * seq -> packet sequence number that triggers the disorder message + * extraSeq -> the largest seq of the received packets + * e) In a DUPLICATE message (UDPIC_FLAGS_DUPLICATE) + * seq -> packet sequence number that triggers the duplicate message + * extraSeq -> the largest seq of the continuously cached packets + * f) In a stop messege (UDPIC_FLAGS_STOP | UDPIC_FLAGS_ACK | UDPIC_FLAGS_CAPACITY) + * seq -> the largest seq of the continuously cached packets + * extraSeq -> the largest seq of the continuously cached packets + * + * + * NOTE that: EOS/STOP flags are often saved in conn_info structure of a connection. + * It is possible for them to be sent together with other flags. + * + */ + uint32 seq; + uint32 extraSeq; +} icpkthdr; + +typedef struct ICBuffer ICBuffer; +typedef struct ICBufferLink ICBufferLink; + +typedef enum ICBufferListType +{ + ICBufferListType_Primary, + ICBufferListType_Secondary, + ICBufferListType_UNDEFINED +} ICBufferListType; + +struct ICBufferLink +{ + ICBufferLink *next; + ICBufferLink *prev; +}; + +/* + * ICBufferList + * ic buffer list data structure. + * + * There are two kinds of lists. The first kind of list uses the primary next/prev pointers. + * And the second kind uses the secondary next/prev pointers. + */ +struct ICBufferList +{ + int len; + ICBufferListType type; /* primary or secondary */ + + ICBufferLink head; + +#if defined(USE_ASSERT_CHECKING) || defined(AMS_VERBOSE_LOGGING) + void icBufferListLog(); +#endif + +#ifdef USE_ASSERT_CHECKING + void icBufferListCheck(const char *prefix); +#endif + + void init(ICBufferListType type); + void destroy(); + + bool is_head(ICBufferLink *link); + int length(); + ICBufferLink* first(); + + ICBuffer* append(ICBuffer *buf); + ICBuffer* remove(ICBuffer *buf); + ICBuffer* pop(); + + void release(bool inExpirationQueue); + + void dump_to_file(FILE *ofile); +}; + +#define GET_ICBUFFER_FROM_PRIMARY(ptr) CONTAINER_OF(ptr, ICBuffer, primary) +#define GET_ICBUFFER_FROM_SECONDARY(ptr) CONTAINER_OF(ptr, ICBuffer, secondary) + +/* + * ICBuffer + * interconnect buffer data structure. + * + * In some cases, an ICBuffer may exists in two lists/queues, + * thus it has two sets of pointers. For example, an ICBuffer + * can exist in an unack queue and an expiration queue at the same time. + * + * It is important to get the ICBuffer address when we iterate a list of + * ICBuffers through primary/secondary links. The Macro GET_ICBUFFER_FROM_PRIMARY + * and GET_ICBUFFER_FROM_SECONDARY are for this purpose. + * + */ +struct ICBuffer +{ + /* primary next and prev pointers */ + ICBufferLink primary; + + /* secondary next and prev pointers */ + ICBufferLink secondary; + + /* connection that this buffer belongs to */ + MotionConn *conn; + + /* + * Three fields for expiration processing + * + * sentTime - the time this buffer was sent nRetry - the number of send + * retries unackQueueRingSlot - unack queue ring slot index + */ + uint64 sentTime; + int32 nRetry; + int32 unackQueueRingSlot; + + /* real data */ + icpkthdr pkt[0]; +}; + +static inline void* +ic_malloc(size_t size) +{ + return malloc(size); +} + +static inline void* +ic_malloc0(size_t size) +{ + void *rs = ic_malloc(size); + if (rs) + memset(rs, 0, size); + return rs; +} + +static inline void +ic_free(void *p) +{ + free(p); +} + +static inline void +ic_free_clean(void **p) +{ + ic_free(*p); + *p = NULL; +} + +static inline void +ic_usleep(long microsec) +{ + if (microsec > 0) + { + struct timeval delay; + + delay.tv_sec = microsec / 1000000L; + delay.tv_usec = microsec % 1000000L; + (void) select(0, NULL, NULL, NULL, &delay); + } +} + +/* + * Put socket into nonblock mode. + * Returns true on success, false on failure. + */ +static inline bool +ic_set_noblock(int sock) +{ + int flags; + + flags = fcntl(sock, F_GETFL); + if (flags < 0) + return false; + if (fcntl(sock, F_SETFL, (flags | O_NONBLOCK)) == -1) + return false; + return true; +} + +/* ic_atomic_xxx */ +typedef struct ic_atomic_uint32 +{ + volatile uint32 value; +} ic_atomic_uint32; + +static inline void +ic_atomic_init_u32(volatile ic_atomic_uint32 *ptr, uint32 val) +{ + ptr->value = val; +} + +static inline uint32 +ic_atomic_read_u32(volatile ic_atomic_uint32 *ptr) +{ + return ptr->value; +} + +static inline void +ic_atomic_write_u32(volatile ic_atomic_uint32 *ptr, uint32 val) +{ + ptr->value = val; +} + +static inline bool +ic_atomic_compare_exchange_u32(volatile ic_atomic_uint32 *ptr, + uint32 *expected, uint32 newval) +{ + /* FIXME: we can probably use a lower consistency model */ + return __atomic_compare_exchange_n(&ptr->value, expected, newval, false, + __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); +} + +static inline uint32 +ic_atomic_add_fetch_u32(volatile ic_atomic_uint32 *ptr, int32 add_) +{ + return __sync_fetch_and_add(&ptr->value, add_) + add_; +} + +static inline uint32 +ic_bswap32(uint32 x) +{ + return + ((x << 24) & 0xff000000) | + ((x << 8) & 0x00ff0000) | + ((x >> 8) & 0x0000ff00) | + ((x >> 24) & 0x000000ff); +} + +#define MAX_TRY (11) +#define TIMEOUT(try) ((try) < MAX_TRY ? (timeoutArray[(try)]) : (timeoutArray[MAX_TRY])) + +#define USECS_PER_SECOND 1000000 +#define MSECS_PER_SECOND 1000 + +/* 1/4 sec in msec */ +#define RX_THREAD_POLL_TIMEOUT (250) + +/* + * Flags definitions for flag-field of UDP-messages + * + * We use bit operations to test these, flags are powers of two only + */ +#define UDPIC_FLAGS_RECEIVER_TO_SENDER (1) +#define UDPIC_FLAGS_ACK (2) +#define UDPIC_FLAGS_STOP (4) +#define UDPIC_FLAGS_EOS (8) +#define UDPIC_FLAGS_NAK (16) +#define UDPIC_FLAGS_DISORDER (32) +#define UDPIC_FLAGS_DUPLICATE (64) +#define UDPIC_FLAGS_CAPACITY (128) + +#define UDPIC_MIN_BUF_SIZE (128 * 1024) + +/* + * ConnHtabBin + * + * A connection hash table bin. + * + */ +struct UDPConn; +typedef struct ConnHtabBin ConnHtabBin; +struct ConnHtabBin +{ + UDPConn *conn; + struct ConnHtabBin *next; +}; + +/* + * ConnHashTable + * + * Connection hash table definition. + * + */ +typedef struct ConnHashTable ConnHashTable; +struct ConnHashTable +{ + ConnHtabBin **table; + int size; + + bool init(); + bool add(UDPConn *conn); + UDPConn *find(icpkthdr *hdr); + void destroy(); + void remove(UDPConn *conn); +}; + +#define CONN_HASH_VALUE(icpkt) ((uint32)((((icpkt)->srcPid ^ (icpkt)->dstPid)) + (icpkt)->dstContentId)) +#define CONN_HASH_MATCH(a, b) (((a)->motNodeId == (b)->motNodeId && \ + (a)->dstContentId == (b)->dstContentId && \ + (a)->srcContentId == (b)->srcContentId && \ + (a)->recvSliceIndex == (b)->recvSliceIndex && \ + (a)->sendSliceIndex == (b)->sendSliceIndex && \ + (a)->srcPid == (b)->srcPid && \ + (a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId)) + +/* + * CursorICHistoryEntry + * + * The definition of cursor IC history entry. + */ +typedef struct CursorICHistoryEntry CursorICHistoryEntry; +struct CursorICHistoryEntry +{ + /* Interconnect instance id. */ + uint32 icId; + + /* Command id. */ + uint32 cid; + + /* + * Interconnect instance status. state 1 (value 1): interconnect is setup + * state 0 (value 0): interconnect was torn down. + */ + uint8 status; + + /* Next entry. */ + CursorICHistoryEntry *next; + + CursorICHistoryEntry(uint32 aicId, uint32 acid): + icId(aicId), cid(acid),status(1){} +}; + +/* + * CursorICHistoryTable + * + * Cursor IC history table. It is a small hash table. + */ +typedef struct CursorICHistoryTable CursorICHistoryTable; +struct CursorICHistoryTable +{ + uint32 size; + uint32 count; + CursorICHistoryEntry **table; + + void init() { + count = 0; + size = session_param.Gp_interconnect_cursor_ic_table_size; + table = (CursorICHistoryEntry **)ic_malloc0(sizeof(CursorICHistoryEntry *) * size); + } + + void add(uint32 icId, uint32 cid) { + uint32 index = icId % size; + CursorICHistoryEntry *p = new CursorICHistoryEntry(icId, cid); + + p->next = this->table[index]; + this->table[index] = p; + this->count++; + + LOG(DEBUG2, "add icid %d cid %d status %d", p->icId, p->cid, p->status); + + return; + } + + /* + * state 1 (value 1): interconnect is setup + * state 0 (value 0): interconnect was torn down. + */ + void update(uint32 icId, uint8 status) { + for (CursorICHistoryEntry *p = table[icId % size]; p; p = p->next) { + if (p->icId == icId) { + p->status = status; + return; + } + } + } + + CursorICHistoryEntry* get(uint32 icId) { + for (CursorICHistoryEntry *p = table[icId % size]; p; p = p->next) { + if (p->icId == icId) + return p; + } + return NULL; + } + + void purge() { + for (uint8 index = 0; index < size; index++) { + while (table[index]) { + CursorICHistoryEntry *trash = table[index]; + table[index] = trash->next; + ic_free(trash); Review Comment: when delete `CursorICHistoryEntry *p = new CursorICHistoryEntry(icId, cid);` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
