This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit bc9cc1c0a7260bfb68f70fdd78cdc186eaf26cab
Author: HouLei <[email protected]>
AuthorDate: Wed Nov 30 17:04:18 2022 +0800

    Add stream zstd compress for gpfdist to gpdb7 (#14144)
    
    Gpfdist is a high-performance ETL tool to load external data for gpdb. In 
practice, while gpfdist can extract the full performance of the network card, 
gpfdist can also affect other processes on the same host.
    
    Especially in cloud services, multiple services may share one physical 
network card. If Gpfdist exclusively uses the network card, it will lead to the 
failure of other services.
    
    If both high transmission efficiency and low network usage are required, 
transferring the data after compression is a reasonable approach. Thus this 
code change is about using stream compression based on zstd algorithm to 
transfer data from gpfdist to gpdb. The code change involves the following 
aspects:
    
    The switch flag(compress) to turn on/off compression transmission for 
gpfdist.
    The modification of the HTTP header for requests and responses for 
interaction between gpdb and gpfdist.
    The modification of compression data(for gpfdist end) and decompressing 
data(for gpdb end) streamingly.
    
    Finally, to use the zstd compression to transfer data, you can add the 
specific flag --compress when starting up gpfdist. For example, typing gpfdist 
-d /home/gpadmin -p 7070 --compress to start gpfdist, and then data would be 
compressed from gpfdist to gpdb.
---
 src/backend/access/external/url_curl.c             | 178 +++++++++++--
 src/bin/gpfdist/gpfdist.c                          | 195 ++++++++++++--
 src/bin/gpfdist/regress/Makefile                   |  12 +
 src/bin/gpfdist/regress/input/gpfdist2.source      |   3 +
 .../gpfdist/regress/input/gpfdist2_compress.source | 250 ++++++++++++++++++
 .../regress/output/gpfdist2_compress.source        | 290 +++++++++++++++++++++
 6 files changed, 893 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/external/url_curl.c 
b/src/backend/access/external/url_curl.c
index 523ed9fd07..3ff585111b 100644
--- a/src/backend/access/external/url_curl.c
+++ b/src/backend/access/external/url_curl.c
@@ -31,6 +31,10 @@
 #include "utils/guc.h"
 #include "utils/resowner.h"
 #include "utils/uri.h"
+#ifdef USE_ZSTD
+#include <zstd.h>
+#include <zstd_errors.h>
+#endif
 
 /*
  * This struct encapsulates the libcurl resources that need to be explicitly
@@ -41,7 +45,10 @@
  */
 typedef struct curlhandle_t
 {
-       CURL       *handle;             /* The curl handle */
+       CURL                    *handle;                /* The curl handle */
+#ifdef USE_ZSTD
+       ZSTD_DCtx               *zstd_dctx;             /* The zstd context */
+#endif
        struct curl_slist *x_httpheader;        /* list of headers */
        bool            in_multi_handle;        /* T, if the handle is in global
                                                                         * 
multi_handle */
@@ -89,8 +96,11 @@ typedef struct
        int                     error,
                                eof;                    /* error & eof flags */
        int                     gp_proto;
-       char       *http_response;
-
+       
+       int                     zstd;                   /* if gpfdist zstd 
compress is enabled, it equals 1 */
+       int                     lastsize;               /* Recording the 
compressed data size */
+       
+       char                    *http_response;
        struct
        {
                int                     datalen;        /* remaining datablock 
length */
@@ -187,6 +197,10 @@ create_curlhandle(void)
        h->x_httpheader = NULL;
        h->in_multi_handle = false;
 
+#ifdef USE_ZSTD
+       h->zstd_dctx = NULL;
+#endif
+
        h->owner = CurrentResourceOwner;
        h->prev = NULL;
        h->next = open_curl_handles;
@@ -230,7 +244,13 @@ destroy_curlhandle(curlhandle_t *h)
                curl_easy_cleanup(h->handle);
                h->handle = NULL;
        }
-
+#ifdef USE_ZSTD
+       if (h->zstd_dctx) 
+       {
+               ZSTD_freeDCtx(h->zstd_dctx);
+               h->zstd_dctx = NULL;
+       }
+#endif
        pfree(h);
 }
 
@@ -286,6 +306,8 @@ header_callback(void *ptr_, size_t size, size_t nmemb, void 
*userp)
        int             i;
        char            buf[20];
 
+       int proto_len = strlen("X-GP-PROTO"), zstd_len = strlen("X-GP-ZSTD");
+
        Assert(size == 1);
 
        /*
@@ -319,10 +341,10 @@ header_callback(void *ptr_, size_t size, size_t nmemb, 
void *userp)
        /*
         * extract the GP-PROTO value from the HTTP header.
         */
-       if (len > 10 && *ptr == 'X' && 0 == strncmp("X-GP-PROTO", ptr, 10))
+       if (len > proto_len && 0 == strncmp("X-GP-PROTO", ptr, proto_len))
        {
-               ptr += 10;
-               len -= 10;
+               ptr += proto_len;
+               len -= proto_len;
 
                while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
                {
@@ -349,6 +371,43 @@ header_callback(void *ptr_, size_t size, size_t nmemb, 
void *userp)
                }
        }
 
+       if (len > zstd_len && 0 == strncmp("X-GP-ZSTD", ptr, zstd_len))
+       {
+               ptr += zstd_len;
+               len -= zstd_len;
+
+               while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
+               {
+                       ptr++;
+                       len--;
+               }
+
+               if (len > 0 && *ptr == ':')
+               {
+                       ptr++;
+                       len--;
+
+                       while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
+                       {
+                               ptr++;
+                               len--;
+                       }
+
+                       for (i = 0; i < sizeof(buf) - 1 && i < len; i++)
+                               buf[i] = ptr[i];
+
+                       buf[i] = 0;
+#ifdef USE_ZSTD
+                       url->zstd = strtol(buf, 0, 0);
+                       if (!url->for_write && url->zstd)
+                       {
+                               url->curl->zstd_dctx = ZSTD_createDCtx();
+                               url->lastsize = 
ZSTD_initDStream(url->curl->zstd_dctx);
+                       }
+#endif
+               }
+       }
+
        return size * nmemb;
 }
 
@@ -1218,6 +1277,9 @@ url_curl_fopen(char *url, bool forwrite, extvar_t *ev, 
CopyFormatOptions *opts)
        {
                /* read specific - (TODO: unclear why some of these are needed) 
*/
                set_httpheader(file, "X-GP-PROTO", "1");
+#ifdef USE_ZSTD
+               set_httpheader(file, "X-GP-ZSTD", "1");
+#endif
                set_httpheader(file, "X-GP-MASTER_HOST", ev->GP_MASTER_HOST);
                set_httpheader(file, "X-GP-MASTER_PORT", ev->GP_MASTER_PORT);
                set_httpheader(file, "X-GP-CSVOPT", ev->GP_CSVOPT);
@@ -1479,6 +1541,29 @@ gp_proto0_read(char *buf, int bufsz, URL_CURL_FILE *file)
        return n;
 }
 
+#ifdef USE_ZSTD
+int 
+decompress_zstd_data(ZSTD_DCtx* ctx, ZSTD_inBuffer* bin, ZSTD_outBuffer* bout)
+{
+       
+       size_t ret;
+       /* Ret indicates the number of bytes of next data frame to be 
decompressed.
+        * And if an error occur in ZSTD_decompressStream, ret will be a error 
number.
+        * If ZSTD_isError is true, the ret is a error number.
+        * The content of the error can be got by ZSTD_getErrorName..
+        */
+       ret = ZSTD_decompressStream(ctx, bout, bin);
+
+       if (ZSTD_isError(ret))
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_FAILURE),
+                               errmsg("ZSTD_decompressStream failed, error is 
%s", ZSTD_getErrorName(ret))));
+       }
+       return ret;
+}
+#endif
+
 /*
  * gp_proto1_read
  *
@@ -1492,7 +1577,8 @@ static size_t
 gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE *file, CopyFromState 
pstate, char *buf2)
 {
        char type;
-       int  n, len;
+       int  n = 0, len = 0;
+       int obufsz = bufsz;
 
        /*
         * Loop through and get all types of messages, until we get actual data,
@@ -1639,11 +1725,41 @@ gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE 
*file, CopyFromState pstate,
                elog(ERROR, "gpfdist error: unknown meta type %d", type);
        }
 
-       /* read data block */
-       if (bufsz > file->block.datalen)
-               bufsz = file->block.datalen;
+       int left_bytes = file->in.top - file->in.bot;
+
+       if (file->zstd)
+       {
+               /* 'lastsize' is the number of bytes required for next 
decompression.
+                * 'left_bytes' is the number of bytes remained in 
'file->in.ptr'.
+                * If left_bytes is less than 'lastsize', the next decompression
+                * can't complete in a decompression operation. Thus, when 
+                * 'file->lastsize > left_bytes', we need more bytes and 
fill_buffer is called.
+                * 
+                * When the condition 'file->block.datalen == len' is met, a new
+                * request just start. In this case lastsize is an init value, 
and 
+                * cannot provide the information about how many bytes required
+                * to finish the first frame decompression. In this case, enough
+                * bytes(more than ZSTD_DStreamInSize() returning) should be 
filled
+                * into 'file->in.ptr' to ensure that the first decompression 
+                * completing.
+                */
+               if (file->lastsize > left_bytes || file->block.datalen == len)
+               {
+#ifdef USE_ZSTD
+                       bufsz = ZSTD_DStreamInSize() - left_bytes;
+                       fill_buffer(file, bufsz);
+#endif
+               }
+       } 
+       else 
+       {
+               /* read data block */
+               if (bufsz > file->block.datalen)
+                       bufsz = file->block.datalen;
+
+               fill_buffer(file, bufsz);
+       }
 
-       fill_buffer(file, bufsz);
        n = file->in.top - file->in.bot;
 
        /* if gpfdist closed connection prematurely or died catch it here */
@@ -1663,13 +1779,46 @@ gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE 
*file, CopyFromState pstate,
        if (n > bufsz)
                n = bufsz;
 
+#ifdef USE_ZSTD
+       if (file->zstd && file->curl->zstd_dctx && !file->eof)
+       {
+               int ret;
+               /* It is absolutely to put the decompression code in a loop.
+                * Since not every call of decompress_zstd_data will get data 
into bout.
+                * However, even thought there is no data in bout, the call of 
+                * decompress_zstd_data is neccersary for following 
decompression.
+                * If a empty buf is returned to gpdb, the error will occur. 
+                * So the loop ensures that we push forward the decompression 
until there 
+                * is data in bout.
+                */
+               do
+               {
+                       ZSTD_inBuffer bin = {file->in.ptr + file->in.bot, 
file->lastsize, 0};
+                       ZSTD_outBuffer bout = {buf, obufsz, 0};
+                       ret = decompress_zstd_data(file->curl->zstd_dctx, &bin, 
&bout);
+                       n = bout.pos; 
+                       file->in.bot += bin.pos;
+                       file->lastsize = ret;
+                       if (!ret)
+                       {
+                               file->lastsize = 
ZSTD_initDStream(file->curl->zstd_dctx);
+                               break;
+                       }               
+               } while (n == 0);
+       }
+       else
+       {
+               memcpy(buf, file->in.ptr + file->in.bot, n);
+               file->in.bot += n;
+       }
+#else
        memcpy(buf, file->in.ptr + file->in.bot, n);
-
        file->in.bot += n;
+#endif
        file->block.datalen -= n;
+
        return n;
 }
-
 /*
  * gp_proto0_write
  *
@@ -1821,7 +1970,6 @@ url_curl_fflush(URL_FILE *file, CopyToState pstate)
 {
        gp_proto0_write((URL_CURL_FILE *) file, pstate);
 }
-
 #else /* USE_CURL */
 
 
diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c
index 6721e95780..7037dda3cc 100644
--- a/src/bin/gpfdist/gpfdist.c
+++ b/src/bin/gpfdist/gpfdist.c
@@ -65,12 +65,18 @@
 #include <pg_config.h>
 #include <pg_config_manual.h>
 #include "gpfdist_helper.h"
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
 #ifdef USE_SSL
 #include <openssl/ssl.h>
 #include <openssl/rand.h>
 #include <openssl/err.h>
 #endif
 
+#define DEFAULT_COMPRESS_LEVEL 1
+#define MAX_FRAME_SIZE 65536
+
 /*  A data block */
 typedef struct blockhdr_t blockhdr_t;
 struct blockhdr_t
@@ -88,6 +94,7 @@ struct block_t
        blockhdr_t      hdr;
        int             bot, top;
        char*           data;
+       char*           cdata;
 };
 
 /*  Get session id for this request */
@@ -95,6 +102,7 @@ struct block_t
 
 static long REQUEST_SEQ = 0;           /*  sequence number for request */
 static long SESSION_SEQ = 0;           /*  sequence number for session */
+static long OUT_BUFFER_SIZE = 0;       /* zstd out buffer size */
 
 static bool base16_decode(char* data);
 
@@ -183,7 +191,8 @@ static struct
        struct transform* trlist; /* transforms from config file */
        const char* ssl; /* path to certificates in case we use gpfdist with 
ssl */
        int                     w; /* The time used for session timeout in 
seconds */
-} opt = { 8080, 8080, 0, 0, 0, ".", 0, 0, -1, 5, 0, 32768, 0, 256, 0, 0, 0, 0 
};
+       int                     compress; /* The flag to indicate whether 
comopression transmission is open */
+} opt = { 8080, 8080, 0, 0, 0, ".", 0, 0, -1, 5, 0, 32768, 0, 256, 0, 0, 0, 0, 
0};
 
 
 typedef union address
@@ -307,7 +316,12 @@ struct request_t
        block_t outblock;       /* next block to send out */
        char*           line_delim_str;
        int             line_delim_length;
-
+#ifdef USE_ZSTD
+       ZSTD_CCtx*              zstd_cctx;      /* zstd context */
+#endif 
+       int                             zstd;           /* request use zstd 
compress */
+       int                             zstd_err_len;   /* space allocate for 
zstd_error string */
+       char*                           zstd_error;     /* string contains zstd 
error*/
 #ifdef USE_SSL
        /* SSL related */
        BIO                     *io;            /* for the i.o. */
@@ -368,6 +382,9 @@ static int session_active_segs_isempty(session_t* session);
 static int request_validate(request_t *r);
 static int request_set_path(request_t *r, const char* d, char* p, char* pp, 
char* path);
 static int request_path_validate(request_t *r, const char* path);
+#ifdef USE_ZSTD
+static int compress_zstd(request_t *r, block_t *blk, int buflen);
+#endif
 static int request_parse_gp_headers(request_t *r, int opt_g);
 static void free_session_cb(int fd, short event, void* arg);
 #ifdef GPFXDIST
@@ -407,6 +424,31 @@ static apr_time_t shutdown_time;
 static void* watchdog_thread(void*);
 #endif
 
+static const char *EMPTY_HTTP_RES = "HTTP/1.0 200 ok\r\n"
+               "Content-type: text/plain\r\n"
+               "Content-length: 0\r\n"
+               "Expires: 0\r\n"
+               "X-GPFDIST-VERSION: " GP_VERSION "\r\n"
+               "Cache-Control: no-cache\r\n"
+               "Connection: close\r\n\r\n";
+
+static const char *HTTP_RESPONSE_ZSTD = "HTTP/1.0 200 ok\r\n"
+               "Content-type: text/plain\r\n"
+               "Expires: 0\r\n"
+               "X-GPFDIST-VERSION: " GP_VERSION "\r\n"
+               "X-GP-PROTO: %d\r\n"
+               "Cache-Control: no-cache\r\n"
+               "Connection: close\r\n"
+               "X-GP-ZSTD: %d\r\n\r\n";
+
+static const char *HTTP_RESPONSE = "HTTP/1.0 200 ok\r\n"
+               "Content-type: text/plain\r\n"
+               "Expires: 0\r\n"
+               "X-GPFDIST-VERSION: " GP_VERSION "\r\n"
+               "X-GP-PROTO: %d\r\n"
+               "Cache-Control: no-cache\r\n"
+               "Connection: close\r\n\r\n";
+
 /*
  * block_fill_header
  *
@@ -611,6 +653,7 @@ static void parse_command_line(int argc, const char* const 
argv[],
 #endif
        { "version", 256, 0, "print version number" },
        { NULL, 'w', 1, "wait for session timeout in seconds" },
+       {"compress", 258, 0, "turn on compressed transmission"},
        { 0 } };
 
        status = apr_getopt_init(&os, pool, argc, argv);
@@ -695,6 +738,15 @@ static void parse_command_line(int argc, const char* const 
argv[],
                case 'w':
                        opt.w = atoi(arg);
                        break;
+#ifdef USE_ZSTD
+               case 258:
+                       opt.compress = 1;
+                       break;
+#else
+               case 258:
+                       usage_error("ZSTD is not supported by this build", 0);
+                       break;
+#endif
                }
        }
 
@@ -872,15 +924,8 @@ static void http_error(request_t* r, int code, const char* 
msg)
 /* send an empty response */
 static void http_empty(request_t* r)
 {
-       static const char buf[] = "HTTP/1.0 200 ok\r\n"
-               "Content-type: text/plain\r\n"
-               "Content-length: 0\r\n"
-               "Expires: 0\r\n"
-               "X-GPFDIST-VERSION: " GP_VERSION "\r\n"
-               "Cache-Control: no-cache\r\n"
-               "Connection: close\r\n\r\n";
        gprintln(r, "HTTP EMPTY: %s %s %s - OK", r->peer, r->in.req->argv[0], 
r->in.req->argv[1]);
-       local_send(r, buf, sizeof buf -1);
+       local_send(r, EMPTY_HTTP_RES, strlen (EMPTY_HTTP_RES));
 }
 
 /* send a Continue response */
@@ -897,17 +942,22 @@ static void http_continue(request_t* r)
 /* send an OK response */
 static apr_status_t http_ok(request_t* r)
 {
-       const char* fmt = "HTTP/1.0 200 ok\r\n"
-               "Content-type: text/plain\r\n"
-               "Expires: 0\r\n"
-               "X-GPFDIST-VERSION: " GP_VERSION "\r\n"
-               "X-GP-PROTO: %d\r\n"
-               "Cache-Control: no-cache\r\n"
-               "Connection: close\r\n\r\n";
+
+       const char* fmt = NULL;
        char buf[1024];
        int m, n;
+       if (r->zstd)
+       {
+               fmt = HTTP_RESPONSE_ZSTD;
+               n = apr_snprintf(buf, sizeof(buf), fmt, r->gp_proto, r->zstd);
+       }
+       else
+       {
+               fmt = HTTP_RESPONSE;
+               n = apr_snprintf(buf, sizeof(buf), fmt, r->gp_proto);
+       }
 
-       n = apr_snprintf(buf, sizeof(buf), fmt, r->gp_proto);
+       
        if (n >= sizeof(buf) - 1)
                gfatal(r, "internal error - buffer overflow during http_ok");
 
@@ -1376,10 +1426,23 @@ session_get_block(const request_t* r, block_t* 
retblock, char* line_delim_str, i
        }
 
        retblock->top = size;
-
        /* fill the block header with meta data for the client to parse and use 
*/
        block_fill_header(r, retblock, &fos);
 
+#ifdef USE_ZSTD
+       if (r->zstd)
+       {
+               int res = compress_zstd(r, retblock, size);
+               
+               if (res < 0)
+               {
+                       return r->zstd_error;
+               }
+
+               retblock->top = res;
+       }
+#endif
+
        return 0;
 }
 
@@ -1892,7 +1955,15 @@ static void do_write(int fd, short event, void* arg)
                 * write out the block data
                 */
                n = datablock->top - datablock->bot;
-               n = local_send(r, datablock->data + datablock->bot, n);
+               if (r->zstd)
+               {
+                       n = local_send(r, datablock->cdata + datablock->bot, n);
+               }
+               else
+               {
+                       n = local_send(r, datablock->data + datablock->bot, n);
+               }
+
                if (n < 0)
                {
                        /*
@@ -3439,6 +3510,11 @@ static int request_parse_gp_headers(request_t *r, int 
opt_g)
                        r->totalsegs = atoi(r->in.req->hvalue[i]);
                else if (0 == strcasecmp("X-GP-SEGMENT-ID", 
r->in.req->hname[i]))
                        r->segid = atoi(r->in.req->hvalue[i]);
+               else if (0 == strcasecmp("X-GP-ZSTD", r->in.req->hname[i]))
+               {
+                       r->zstd = atoi(r->in.req->hvalue[i]);
+                       r->zstd = opt.compress ? r->zstd : 0;
+               }
                else if (0 == strcasecmp("X-GP-LINE-DELIM-STR", 
r->in.req->hname[i]))
                {
                        if (NULL == r->in.req->hvalue[i] ||  
((int)strlen(r->in.req->hvalue[i])) % 2 == 1 || 
!base16_decode(r->in.req->hvalue[i]))
@@ -3470,6 +3546,18 @@ static int request_parse_gp_headers(request_t *r, int 
opt_g)
                }
        }
 
+#ifdef USE_ZSTD
+       if (r->zstd)
+       {
+               OUT_BUFFER_SIZE = ZSTD_CStreamOutSize();
+               r->zstd_err_len = 1024;
+               r->outblock.cdata = palloc_safe(r, r->pool, opt.m, "out of 
memory when allocating buffer for compressed data: %d bytes", opt.m);
+               r->zstd_error = palloc_safe(r, r->pool, r->zstd_err_len, "out 
of memory when allocating error buffer for compressed data: %d bytes", 
r->zstd_err_len);
+               if (r->is_get)
+                       r->zstd_cctx = ZSTD_createCStream();
+       }
+#endif
+
        if (r->line_delim_length > 0)
        {
                if (NULL == r->line_delim_str || 
(((int)strlen(r->line_delim_str)) != r->line_delim_length))
@@ -4457,6 +4545,12 @@ static void request_cleanup(request_t *r)
 {
        request_shutdown_sock(r);
        setup_do_close(r);
+#ifdef USE_ZSTD
+       if ( r->zstd && r->is_get )
+       {
+               ZSTD_freeCCtx(r->zstd_cctx);
+       }
+#endif
 }
 
 static void setup_do_close(request_t* r)
@@ -4572,3 +4666,64 @@ static void delay_watchdog_timer()
 {
 }
 #endif
+
+#ifdef USE_ZSTD
+/*
+ * compress_zstd
+ * It is for compress data in buffer. Return is the length of data after 
compression.
+ */
+
+static int compress_zstd(request_t *r, block_t *blk, int buflen)
+{
+       char *buf = blk->data;
+       int offset = 0;
+       int cursor = 0;
+
+       if (!r->zstd_cctx)
+       {
+               snprintf(r->zstd_error, r->zstd_err_len, "Creating compression 
context failed, out of memory.");
+               gprintln(NULL, r->zstd_error);
+               return -1;
+       }
+
+       size_t init_result = ZSTD_initCStream(r->zstd_cctx, 
DEFAULT_COMPRESS_LEVEL);
+       if (ZSTD_isError(init_result)) 
+       {
+               snprintf(r->zstd_error, r->zstd_err_len, "Creating compression 
context initialization failed, error is %s.", ZSTD_getErrorName(init_result));
+               gprintln(NULL, r->zstd_error);
+               return -1;
+       }
+
+       while(cursor < buflen){
+               int in_size = (buflen - cursor) > MAX_FRAME_SIZE ? 
MAX_FRAME_SIZE : (buflen - cursor);
+               ZSTD_inBuffer bin = {buf + cursor, in_size, 0};
+               int outpos = 0;
+               while(bin.pos < bin.size){
+                       ZSTD_outBuffer bout = {blk->cdata + offset, 
OUT_BUFFER_SIZE - outpos, 0};
+                       size_t res = ZSTD_compressStream(r->zstd_cctx, &bout, 
&bin);
+
+                       if (ZSTD_isError(res))
+                       {
+                               snprintf(r->zstd_error, r->zstd_err_len, 
"Compression failed, error is %s.", ZSTD_getErrorName(res));
+                               gprintln(NULL, r->zstd_error);
+                               return -1;
+                       }
+                       offset += bout.pos;
+                       outpos = bout.pos; 
+               }
+               cursor += in_size;
+       }
+
+       ZSTD_outBuffer output = { r->outblock.cdata + offset, OUT_BUFFER_SIZE, 
0 };
+       size_t const remainingToFlush = ZSTD_endStream(r->zstd_cctx, &output);  
 /* close frame */
+       if (remainingToFlush)
+       {
+               snprintf(r->zstd_error, r->zstd_err_len, "Compression failed, 
error is not fully flushed.");
+               gprintln(NULL, r->zstd_error);
+               return -1;
+       }
+       offset += output.pos;
+
+       return offset;
+}
+#endif
diff --git a/src/bin/gpfdist/regress/Makefile b/src/bin/gpfdist/regress/Makefile
index 0dda3ca8ab..bf7b0a9ad1 100644
--- a/src/bin/gpfdist/regress/Makefile
+++ b/src/bin/gpfdist/regress/Makefile
@@ -11,6 +11,10 @@ ifeq ($(with_openssl),yes)
 endif
 endif
 
+ifeq ($(with_zstd),yes)
+       REGRESS += gpfdist2_compress
+endif
+
 REGRESS_OPTS = --init-file=init_file
 
 installcheck: watchdog ipv4v6_ports
@@ -22,6 +26,14 @@ ifeq ($(with_openssl),yes)
        # for verify_gpfdists_cert=off
        cp data/gpfdist_ssl/certs_matching/root.crt 
data/gpfdist_ssl/certs_not_matching
 endif
+endif
+ifeq ($(with_zstd),yes)
+       rm -rf data/gpfdist2/lineitem.tbl.long
+       touch data/gpfdist2/lineitem.tbl.long
+       for name in `seq 1 1000`; \
+       do \
+               head -100 data/gpfdist2/lineitem.tbl >> 
data/gpfdist2/lineitem.tbl.long; \
+       done  
 endif
        $(top_builddir)/src/test/regress/pg_regress --dbname=gpfdist_regression 
$(REGRESS) $(REGRESS_OPTS)
 
diff --git a/src/bin/gpfdist/regress/input/gpfdist2.source 
b/src/bin/gpfdist/regress/input/gpfdist2.source
index f54c83e34d..065115e0e5 100644
--- a/src/bin/gpfdist/regress/input/gpfdist2.source
+++ b/src/bin/gpfdist/regress/input/gpfdist2.source
@@ -48,6 +48,9 @@ select * from gpfdist2_start;
 drop table if exists lineitem;
 drop external table if exists ext_lineitem;
 drop external table if exists ext_simple;
+DROP EXTERNAL TABLE IF EXISTS gz_multi_chunk_2;
+DROP EXTERNAL TABLE IF EXISTS gz_multi_chunk;
+DROP EXTERNAL TABLE IF EXISTS full_csvopt;
 -- end_ignore
 
 -- test 1 using a .gz file
diff --git a/src/bin/gpfdist/regress/input/gpfdist2_compress.source 
b/src/bin/gpfdist/regress/input/gpfdist2_compress.source
new file mode 100644
index 0000000000..8c249ca6d7
--- /dev/null
+++ b/src/bin/gpfdist/regress/input/gpfdist2_compress.source
@@ -0,0 +1,250 @@
+--
+-- GPFDIST test cases set 2. This test set is moved from cdbunit.
+--
+drop table if exists REG_REGION;
+set optimizer_print_missing_stats = off;
+CREATE TABLE REG_REGION (R_REGIONKEY INT, R_NAME CHAR(25), R_COMMENT 
VARCHAR(152)) DISTRIBUTED BY (R_REGIONKEY);
+-- start_ignore
+-- end_ignore
+-- --------------------------------------
+-- 'gpfdist' protocol
+-- --------------------------------------
+DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start;
+DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_stop;
+CREATE EXTERNAL WEB TABLE gpfdist2_start (x text)
+execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress 
</dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 127.0.0.1:7070 
>/dev/null 2>&1 && break; sleep 1; done; echo "starting...") '
+on SEGMENT 0
+FORMAT 'text' (delimiter '|');
+
+CREATE EXTERNAL WEB TABLE gpfdist2_stop (x text)
+execute E'(ps -A -o pid,comm |grep [g]pfdist |grep -v postgres: |awk \'{print 
$1;}\' |xargs kill) > /dev/null 2>&1; echo "stopping..."'
+on SEGMENT 0
+FORMAT 'text' (delimiter '|');
+
+-- start_ignore
+select * from gpfdist2_stop;
+select * from gpfdist2_start;
+-- end_ignore
+
+--- test 1 using a little file
+
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+DROP EXTERNAL TABLE ext_lineitem;
+
+-- test 2 use a bigger file.
+
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+DROP EXTERNAL TABLE ext_lineitem;
+
+-- test 3 line too long with defaults
+
+CREATE EXTERNAL TABLE ext_test (
+                id text,
+                stuff text
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/longline.txt'
+)
+FORMAT 'text'
+(
+        DELIMITER AS ','
+) LOG ERRORS SEGMENT REJECT LIMIT 2
+;
+SELECT count(*) FROM ext_test;
+DROP EXTERNAL TABLE ext_test;
+
+--test 4 using csv data
+CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION 
('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' 
(NEWLINE 'CRLF');
+SELECT count(*) FROM ext_crlf_with_lf_column;
+DROP EXTERNAL TABLE ext_crlf_with_lf_column;
+
+-- test 5 use two urls.
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long',
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+DROP EXTERNAL TABLE ext_lineitem;
+
+-- start_ignore
+select * from gpfdist2_stop;
+-- end_ignore
+
+
+--test bigger buffer start
+DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start;
+
+CREATE EXTERNAL WEB TABLE gpfdist2_start (x text)
+execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress -m 
1200000 </dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 
127.0.0.1:7070 >/dev/null 2>&1 && break; sleep 1; done; echo "starting...") '
+on SEGMENT 0
+FORMAT 'text' (delimiter '|');
+
+-- start_ignore
+select * from gpfdist2_start;
+-- end_ignore
+
+-- test 1 using a little file
+
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+DROP EXTERNAL TABLE ext_lineitem;
+
+-- test 2 use a bigger file.
+
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+DROP EXTERNAL TABLE ext_lineitem;
+
+-- test 3 line too long with defaults
+
+CREATE EXTERNAL TABLE ext_test (
+                id text,
+                stuff text
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/longline.txt'
+)
+FORMAT 'text'
+(
+        DELIMITER AS ','
+) LOG ERRORS SEGMENT REJECT LIMIT 2
+;
+SELECT count(*) FROM ext_test;
+DROP EXTERNAL TABLE ext_test;
+
+--test 4 using csv data
+CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION 
('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' 
(NEWLINE 'CRLF');
+SELECT count(*) FROM ext_crlf_with_lf_column;
+DROP EXTERNAL TABLE ext_crlf_with_lf_column;
\ No newline at end of file
diff --git a/src/bin/gpfdist/regress/output/gpfdist2_compress.source 
b/src/bin/gpfdist/regress/output/gpfdist2_compress.source
new file mode 100644
index 0000000000..16d9a0d7cb
--- /dev/null
+++ b/src/bin/gpfdist/regress/output/gpfdist2_compress.source
@@ -0,0 +1,290 @@
+--
+-- GPFDIST test cases set 2. This test set is moved from cdbunit.
+--
+drop table if exists REG_REGION;
+set optimizer_print_missing_stats = off;
+CREATE TABLE REG_REGION (R_REGIONKEY INT, R_NAME CHAR(25), R_COMMENT 
VARCHAR(152)) DISTRIBUTED BY (R_REGIONKEY);
+-- start_ignore
+-- end_ignore
+-- --------------------------------------
+-- 'gpfdist' protocol
+-- --------------------------------------
+DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start;
+DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_stop;
+CREATE EXTERNAL WEB TABLE gpfdist2_start (x text)
+execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress 
</dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 127.0.0.1:7070 
>/dev/null 2>&1 && break; sleep 1; done; echo "starting...") '
+on SEGMENT 0
+FORMAT 'text' (delimiter '|');
+CREATE EXTERNAL WEB TABLE gpfdist2_stop (x text)
+execute E'(ps -A -o pid,comm |grep [g]pfdist |grep -v postgres: |awk \'{print 
$1;}\' |xargs kill) > /dev/null 2>&1; echo "stopping..."'
+on SEGMENT 0
+FORMAT 'text' (delimiter '|');
+-- start_ignore
+select * from gpfdist2_stop;
+      x      
+-------------
+ stopping...
+(1 row)
+
+select * from gpfdist2_start;
+      x      
+-------------
+ starting...
+(1 row)
+
+-- end_ignore
+--- test 1 using a little file
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+ count 
+-------
+   256
+(1 row)
+
+DROP EXTERNAL TABLE ext_lineitem;
+-- test 2 use a bigger file.
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+ count  
+--------
+ 100000
+(1 row)
+
+DROP EXTERNAL TABLE ext_lineitem;
+-- test 3 line too long with defaults
+CREATE EXTERNAL TABLE ext_test (
+                id text,
+                stuff text
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/longline.txt'
+)
+FORMAT 'text'
+(
+        DELIMITER AS ','
+) LOG ERRORS SEGMENT REJECT LIMIT 2
+;
+SELECT count(*) FROM ext_test;
+ERROR:  gpfdist error - line too long in file 
@abs_srcdir@/data/gpfdist2/longline.txt near (0 bytes)  (seg1 slice1 
127.0.0.1:7001 pid=30337)
+CONTEXT:  External table ext_test, line 1 of file 
gpfdist://@hostname@:7070/gpfdist2/longline.txt
+DROP EXTERNAL TABLE ext_test;
+--test 4 using csv data
+CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION 
('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' 
(NEWLINE 'CRLF');
+SELECT count(*) FROM ext_crlf_with_lf_column;
+ count 
+-------
+ 10367
+(1 row)
+
+DROP EXTERNAL TABLE ext_crlf_with_lf_column;
+-- test 5 use two urls.
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long',
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+ count  
+--------
+ 100256
+(1 row)
+
+DROP EXTERNAL TABLE ext_lineitem;
+-- start_ignore
+select * from gpfdist2_stop;
+      x      
+-------------
+ stopping...
+(1 row)
+
+-- end_ignore
+--test bigger buffer start
+DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start;
+CREATE EXTERNAL WEB TABLE gpfdist2_start (x text)
+execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress -m 
1200000 </dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 
127.0.0.1:7070 >/dev/null 2>&1 && break; sleep 1; done; echo "starting...") '
+on SEGMENT 0
+FORMAT 'text' (delimiter '|');
+-- start_ignore
+select * from gpfdist2_start;
+      x      
+-------------
+ starting...
+(1 row)
+
+-- end_ignore
+-- test 1 using a little file
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+ count 
+-------
+   256
+(1 row)
+
+DROP EXTERNAL TABLE ext_lineitem;
+-- test 2 use a bigger file.
+CREATE EXTERNAL TABLE ext_lineitem (
+                L_ORDERKEY INT8,
+                L_PARTKEY INTEGER,
+                L_SUPPKEY INTEGER,
+                L_LINENUMBER integer,
+                L_QUANTITY decimal,
+                L_EXTENDEDPRICE decimal,
+                L_DISCOUNT decimal,
+                L_TAX decimal,
+                L_RETURNFLAG CHAR(1),
+                L_LINESTATUS CHAR(1),
+                L_SHIPDATE date,
+                L_COMMITDATE date,
+                L_RECEIPTDATE date,
+                L_SHIPINSTRUCT CHAR(25),
+                L_SHIPMODE CHAR(10),
+                L_COMMENT VARCHAR(44)
+                )
+LOCATION
+(
+        'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long'
+)
+FORMAT 'text'
+(
+        DELIMITER AS '|'
+)
+;
+SELECT count(*) FROM ext_lineitem;
+ count  
+--------
+ 100000
+(1 row)
+
+DROP EXTERNAL TABLE ext_lineitem;
+-- test 3 line too long with defaults
+CREATE EXTERNAL TABLE ext_test (
+                id text,
+                stuff text
+                )
+LOCATION
+(
+      'gpfdist://@hostname@:7070/gpfdist2/longline.txt'
+)
+FORMAT 'text'
+(
+        DELIMITER AS ','
+) LOG ERRORS SEGMENT REJECT LIMIT 2
+;
+SELECT count(*) FROM ext_test;
+ count 
+-------
+     1
+(1 row)
+
+DROP EXTERNAL TABLE ext_test;
+--test 4 using csv data
+CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION 
('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' 
(NEWLINE 'CRLF');
+SELECT count(*) FROM ext_crlf_with_lf_column;
+ count 
+-------
+ 10367
+(1 row)
+
+DROP EXTERNAL TABLE ext_crlf_with_lf_column;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to