This is an automated email from the ASF dual-hosted git repository. maxyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit bc9cc1c0a7260bfb68f70fdd78cdc186eaf26cab Author: HouLei <[email protected]> AuthorDate: Wed Nov 30 17:04:18 2022 +0800 Add stream zstd compress for gpfdist to gpdb7 (#14144) Gpfdist is a high-performance ETL tool to load external data for gpdb. In practice, while gpfdist can extract the full performance of the network card, gpfdist can also affect other processes on the same host. Especially in cloud services, multiple services may share one physical network card. If Gpfdist exclusively uses the network card, it will lead to the failure of other services. If both high transmission efficiency and low network usage are required, transferring the data after compression is a reasonable approach. Thus this code change is about using stream compression based on zstd algorithm to transfer data from gpfdist to gpdb. The code change involves the following aspects: The switch flag(compress) to turn on/off compression transmission for gpfdist. The modification of the HTTP header for requests and responses for interaction between gpdb and gpfdist. The modification of compression data(for gpfdist end) and decompressing data(for gpdb end) streamingly. Finally, to use the zstd compression to transfer data, you can add the specific flag --compress when starting up gpfdist. For example, typing gpfdist -d /home/gpadmin -p 7070 --compress to start gpfdist, and then data would be compressed from gpfdist to gpdb. --- src/backend/access/external/url_curl.c | 178 +++++++++++-- src/bin/gpfdist/gpfdist.c | 195 ++++++++++++-- src/bin/gpfdist/regress/Makefile | 12 + src/bin/gpfdist/regress/input/gpfdist2.source | 3 + .../gpfdist/regress/input/gpfdist2_compress.source | 250 ++++++++++++++++++ .../regress/output/gpfdist2_compress.source | 290 +++++++++++++++++++++ 6 files changed, 893 insertions(+), 35 deletions(-) diff --git a/src/backend/access/external/url_curl.c b/src/backend/access/external/url_curl.c index 523ed9fd07..3ff585111b 100644 --- a/src/backend/access/external/url_curl.c +++ b/src/backend/access/external/url_curl.c @@ -31,6 +31,10 @@ #include "utils/guc.h" #include "utils/resowner.h" #include "utils/uri.h" +#ifdef USE_ZSTD +#include <zstd.h> +#include <zstd_errors.h> +#endif /* * This struct encapsulates the libcurl resources that need to be explicitly @@ -41,7 +45,10 @@ */ typedef struct curlhandle_t { - CURL *handle; /* The curl handle */ + CURL *handle; /* The curl handle */ +#ifdef USE_ZSTD + ZSTD_DCtx *zstd_dctx; /* The zstd context */ +#endif struct curl_slist *x_httpheader; /* list of headers */ bool in_multi_handle; /* T, if the handle is in global * multi_handle */ @@ -89,8 +96,11 @@ typedef struct int error, eof; /* error & eof flags */ int gp_proto; - char *http_response; - + + int zstd; /* if gpfdist zstd compress is enabled, it equals 1 */ + int lastsize; /* Recording the compressed data size */ + + char *http_response; struct { int datalen; /* remaining datablock length */ @@ -187,6 +197,10 @@ create_curlhandle(void) h->x_httpheader = NULL; h->in_multi_handle = false; +#ifdef USE_ZSTD + h->zstd_dctx = NULL; +#endif + h->owner = CurrentResourceOwner; h->prev = NULL; h->next = open_curl_handles; @@ -230,7 +244,13 @@ destroy_curlhandle(curlhandle_t *h) curl_easy_cleanup(h->handle); h->handle = NULL; } - +#ifdef USE_ZSTD + if (h->zstd_dctx) + { + ZSTD_freeDCtx(h->zstd_dctx); + h->zstd_dctx = NULL; + } +#endif pfree(h); } @@ -286,6 +306,8 @@ header_callback(void *ptr_, size_t size, size_t nmemb, void *userp) int i; char buf[20]; + int proto_len = strlen("X-GP-PROTO"), zstd_len = strlen("X-GP-ZSTD"); + Assert(size == 1); /* @@ -319,10 +341,10 @@ header_callback(void *ptr_, size_t size, size_t nmemb, void *userp) /* * extract the GP-PROTO value from the HTTP header. */ - if (len > 10 && *ptr == 'X' && 0 == strncmp("X-GP-PROTO", ptr, 10)) + if (len > proto_len && 0 == strncmp("X-GP-PROTO", ptr, proto_len)) { - ptr += 10; - len -= 10; + ptr += proto_len; + len -= proto_len; while (len > 0 && (*ptr == ' ' || *ptr == '\t')) { @@ -349,6 +371,43 @@ header_callback(void *ptr_, size_t size, size_t nmemb, void *userp) } } + if (len > zstd_len && 0 == strncmp("X-GP-ZSTD", ptr, zstd_len)) + { + ptr += zstd_len; + len -= zstd_len; + + while (len > 0 && (*ptr == ' ' || *ptr == '\t')) + { + ptr++; + len--; + } + + if (len > 0 && *ptr == ':') + { + ptr++; + len--; + + while (len > 0 && (*ptr == ' ' || *ptr == '\t')) + { + ptr++; + len--; + } + + for (i = 0; i < sizeof(buf) - 1 && i < len; i++) + buf[i] = ptr[i]; + + buf[i] = 0; +#ifdef USE_ZSTD + url->zstd = strtol(buf, 0, 0); + if (!url->for_write && url->zstd) + { + url->curl->zstd_dctx = ZSTD_createDCtx(); + url->lastsize = ZSTD_initDStream(url->curl->zstd_dctx); + } +#endif + } + } + return size * nmemb; } @@ -1218,6 +1277,9 @@ url_curl_fopen(char *url, bool forwrite, extvar_t *ev, CopyFormatOptions *opts) { /* read specific - (TODO: unclear why some of these are needed) */ set_httpheader(file, "X-GP-PROTO", "1"); +#ifdef USE_ZSTD + set_httpheader(file, "X-GP-ZSTD", "1"); +#endif set_httpheader(file, "X-GP-MASTER_HOST", ev->GP_MASTER_HOST); set_httpheader(file, "X-GP-MASTER_PORT", ev->GP_MASTER_PORT); set_httpheader(file, "X-GP-CSVOPT", ev->GP_CSVOPT); @@ -1479,6 +1541,29 @@ gp_proto0_read(char *buf, int bufsz, URL_CURL_FILE *file) return n; } +#ifdef USE_ZSTD +int +decompress_zstd_data(ZSTD_DCtx* ctx, ZSTD_inBuffer* bin, ZSTD_outBuffer* bout) +{ + + size_t ret; + /* Ret indicates the number of bytes of next data frame to be decompressed. + * And if an error occur in ZSTD_decompressStream, ret will be a error number. + * If ZSTD_isError is true, the ret is a error number. + * The content of the error can be got by ZSTD_getErrorName.. + */ + ret = ZSTD_decompressStream(ctx, bout, bin); + + if (ZSTD_isError(ret)) + { + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("ZSTD_decompressStream failed, error is %s", ZSTD_getErrorName(ret)))); + } + return ret; +} +#endif + /* * gp_proto1_read * @@ -1492,7 +1577,8 @@ static size_t gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE *file, CopyFromState pstate, char *buf2) { char type; - int n, len; + int n = 0, len = 0; + int obufsz = bufsz; /* * Loop through and get all types of messages, until we get actual data, @@ -1639,11 +1725,41 @@ gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE *file, CopyFromState pstate, elog(ERROR, "gpfdist error: unknown meta type %d", type); } - /* read data block */ - if (bufsz > file->block.datalen) - bufsz = file->block.datalen; + int left_bytes = file->in.top - file->in.bot; + + if (file->zstd) + { + /* 'lastsize' is the number of bytes required for next decompression. + * 'left_bytes' is the number of bytes remained in 'file->in.ptr'. + * If left_bytes is less than 'lastsize', the next decompression + * can't complete in a decompression operation. Thus, when + * 'file->lastsize > left_bytes', we need more bytes and fill_buffer is called. + * + * When the condition 'file->block.datalen == len' is met, a new + * request just start. In this case lastsize is an init value, and + * cannot provide the information about how many bytes required + * to finish the first frame decompression. In this case, enough + * bytes(more than ZSTD_DStreamInSize() returning) should be filled + * into 'file->in.ptr' to ensure that the first decompression + * completing. + */ + if (file->lastsize > left_bytes || file->block.datalen == len) + { +#ifdef USE_ZSTD + bufsz = ZSTD_DStreamInSize() - left_bytes; + fill_buffer(file, bufsz); +#endif + } + } + else + { + /* read data block */ + if (bufsz > file->block.datalen) + bufsz = file->block.datalen; + + fill_buffer(file, bufsz); + } - fill_buffer(file, bufsz); n = file->in.top - file->in.bot; /* if gpfdist closed connection prematurely or died catch it here */ @@ -1663,13 +1779,46 @@ gp_proto1_read(char *buf, int bufsz, URL_CURL_FILE *file, CopyFromState pstate, if (n > bufsz) n = bufsz; +#ifdef USE_ZSTD + if (file->zstd && file->curl->zstd_dctx && !file->eof) + { + int ret; + /* It is absolutely to put the decompression code in a loop. + * Since not every call of decompress_zstd_data will get data into bout. + * However, even thought there is no data in bout, the call of + * decompress_zstd_data is neccersary for following decompression. + * If a empty buf is returned to gpdb, the error will occur. + * So the loop ensures that we push forward the decompression until there + * is data in bout. + */ + do + { + ZSTD_inBuffer bin = {file->in.ptr + file->in.bot, file->lastsize, 0}; + ZSTD_outBuffer bout = {buf, obufsz, 0}; + ret = decompress_zstd_data(file->curl->zstd_dctx, &bin, &bout); + n = bout.pos; + file->in.bot += bin.pos; + file->lastsize = ret; + if (!ret) + { + file->lastsize = ZSTD_initDStream(file->curl->zstd_dctx); + break; + } + } while (n == 0); + } + else + { + memcpy(buf, file->in.ptr + file->in.bot, n); + file->in.bot += n; + } +#else memcpy(buf, file->in.ptr + file->in.bot, n); - file->in.bot += n; +#endif file->block.datalen -= n; + return n; } - /* * gp_proto0_write * @@ -1821,7 +1970,6 @@ url_curl_fflush(URL_FILE *file, CopyToState pstate) { gp_proto0_write((URL_CURL_FILE *) file, pstate); } - #else /* USE_CURL */ diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index 6721e95780..7037dda3cc 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -65,12 +65,18 @@ #include <pg_config.h> #include <pg_config_manual.h> #include "gpfdist_helper.h" +#ifdef USE_ZSTD +#include <zstd.h> +#endif #ifdef USE_SSL #include <openssl/ssl.h> #include <openssl/rand.h> #include <openssl/err.h> #endif +#define DEFAULT_COMPRESS_LEVEL 1 +#define MAX_FRAME_SIZE 65536 + /* A data block */ typedef struct blockhdr_t blockhdr_t; struct blockhdr_t @@ -88,6 +94,7 @@ struct block_t blockhdr_t hdr; int bot, top; char* data; + char* cdata; }; /* Get session id for this request */ @@ -95,6 +102,7 @@ struct block_t static long REQUEST_SEQ = 0; /* sequence number for request */ static long SESSION_SEQ = 0; /* sequence number for session */ +static long OUT_BUFFER_SIZE = 0; /* zstd out buffer size */ static bool base16_decode(char* data); @@ -183,7 +191,8 @@ static struct struct transform* trlist; /* transforms from config file */ const char* ssl; /* path to certificates in case we use gpfdist with ssl */ int w; /* The time used for session timeout in seconds */ -} opt = { 8080, 8080, 0, 0, 0, ".", 0, 0, -1, 5, 0, 32768, 0, 256, 0, 0, 0, 0 }; + int compress; /* The flag to indicate whether comopression transmission is open */ +} opt = { 8080, 8080, 0, 0, 0, ".", 0, 0, -1, 5, 0, 32768, 0, 256, 0, 0, 0, 0, 0}; typedef union address @@ -307,7 +316,12 @@ struct request_t block_t outblock; /* next block to send out */ char* line_delim_str; int line_delim_length; - +#ifdef USE_ZSTD + ZSTD_CCtx* zstd_cctx; /* zstd context */ +#endif + int zstd; /* request use zstd compress */ + int zstd_err_len; /* space allocate for zstd_error string */ + char* zstd_error; /* string contains zstd error*/ #ifdef USE_SSL /* SSL related */ BIO *io; /* for the i.o. */ @@ -368,6 +382,9 @@ static int session_active_segs_isempty(session_t* session); static int request_validate(request_t *r); static int request_set_path(request_t *r, const char* d, char* p, char* pp, char* path); static int request_path_validate(request_t *r, const char* path); +#ifdef USE_ZSTD +static int compress_zstd(request_t *r, block_t *blk, int buflen); +#endif static int request_parse_gp_headers(request_t *r, int opt_g); static void free_session_cb(int fd, short event, void* arg); #ifdef GPFXDIST @@ -407,6 +424,31 @@ static apr_time_t shutdown_time; static void* watchdog_thread(void*); #endif +static const char *EMPTY_HTTP_RES = "HTTP/1.0 200 ok\r\n" + "Content-type: text/plain\r\n" + "Content-length: 0\r\n" + "Expires: 0\r\n" + "X-GPFDIST-VERSION: " GP_VERSION "\r\n" + "Cache-Control: no-cache\r\n" + "Connection: close\r\n\r\n"; + +static const char *HTTP_RESPONSE_ZSTD = "HTTP/1.0 200 ok\r\n" + "Content-type: text/plain\r\n" + "Expires: 0\r\n" + "X-GPFDIST-VERSION: " GP_VERSION "\r\n" + "X-GP-PROTO: %d\r\n" + "Cache-Control: no-cache\r\n" + "Connection: close\r\n" + "X-GP-ZSTD: %d\r\n\r\n"; + +static const char *HTTP_RESPONSE = "HTTP/1.0 200 ok\r\n" + "Content-type: text/plain\r\n" + "Expires: 0\r\n" + "X-GPFDIST-VERSION: " GP_VERSION "\r\n" + "X-GP-PROTO: %d\r\n" + "Cache-Control: no-cache\r\n" + "Connection: close\r\n\r\n"; + /* * block_fill_header * @@ -611,6 +653,7 @@ static void parse_command_line(int argc, const char* const argv[], #endif { "version", 256, 0, "print version number" }, { NULL, 'w', 1, "wait for session timeout in seconds" }, + {"compress", 258, 0, "turn on compressed transmission"}, { 0 } }; status = apr_getopt_init(&os, pool, argc, argv); @@ -695,6 +738,15 @@ static void parse_command_line(int argc, const char* const argv[], case 'w': opt.w = atoi(arg); break; +#ifdef USE_ZSTD + case 258: + opt.compress = 1; + break; +#else + case 258: + usage_error("ZSTD is not supported by this build", 0); + break; +#endif } } @@ -872,15 +924,8 @@ static void http_error(request_t* r, int code, const char* msg) /* send an empty response */ static void http_empty(request_t* r) { - static const char buf[] = "HTTP/1.0 200 ok\r\n" - "Content-type: text/plain\r\n" - "Content-length: 0\r\n" - "Expires: 0\r\n" - "X-GPFDIST-VERSION: " GP_VERSION "\r\n" - "Cache-Control: no-cache\r\n" - "Connection: close\r\n\r\n"; gprintln(r, "HTTP EMPTY: %s %s %s - OK", r->peer, r->in.req->argv[0], r->in.req->argv[1]); - local_send(r, buf, sizeof buf -1); + local_send(r, EMPTY_HTTP_RES, strlen (EMPTY_HTTP_RES)); } /* send a Continue response */ @@ -897,17 +942,22 @@ static void http_continue(request_t* r) /* send an OK response */ static apr_status_t http_ok(request_t* r) { - const char* fmt = "HTTP/1.0 200 ok\r\n" - "Content-type: text/plain\r\n" - "Expires: 0\r\n" - "X-GPFDIST-VERSION: " GP_VERSION "\r\n" - "X-GP-PROTO: %d\r\n" - "Cache-Control: no-cache\r\n" - "Connection: close\r\n\r\n"; + + const char* fmt = NULL; char buf[1024]; int m, n; + if (r->zstd) + { + fmt = HTTP_RESPONSE_ZSTD; + n = apr_snprintf(buf, sizeof(buf), fmt, r->gp_proto, r->zstd); + } + else + { + fmt = HTTP_RESPONSE; + n = apr_snprintf(buf, sizeof(buf), fmt, r->gp_proto); + } - n = apr_snprintf(buf, sizeof(buf), fmt, r->gp_proto); + if (n >= sizeof(buf) - 1) gfatal(r, "internal error - buffer overflow during http_ok"); @@ -1376,10 +1426,23 @@ session_get_block(const request_t* r, block_t* retblock, char* line_delim_str, i } retblock->top = size; - /* fill the block header with meta data for the client to parse and use */ block_fill_header(r, retblock, &fos); +#ifdef USE_ZSTD + if (r->zstd) + { + int res = compress_zstd(r, retblock, size); + + if (res < 0) + { + return r->zstd_error; + } + + retblock->top = res; + } +#endif + return 0; } @@ -1892,7 +1955,15 @@ static void do_write(int fd, short event, void* arg) * write out the block data */ n = datablock->top - datablock->bot; - n = local_send(r, datablock->data + datablock->bot, n); + if (r->zstd) + { + n = local_send(r, datablock->cdata + datablock->bot, n); + } + else + { + n = local_send(r, datablock->data + datablock->bot, n); + } + if (n < 0) { /* @@ -3439,6 +3510,11 @@ static int request_parse_gp_headers(request_t *r, int opt_g) r->totalsegs = atoi(r->in.req->hvalue[i]); else if (0 == strcasecmp("X-GP-SEGMENT-ID", r->in.req->hname[i])) r->segid = atoi(r->in.req->hvalue[i]); + else if (0 == strcasecmp("X-GP-ZSTD", r->in.req->hname[i])) + { + r->zstd = atoi(r->in.req->hvalue[i]); + r->zstd = opt.compress ? r->zstd : 0; + } else if (0 == strcasecmp("X-GP-LINE-DELIM-STR", r->in.req->hname[i])) { if (NULL == r->in.req->hvalue[i] || ((int)strlen(r->in.req->hvalue[i])) % 2 == 1 || !base16_decode(r->in.req->hvalue[i])) @@ -3470,6 +3546,18 @@ static int request_parse_gp_headers(request_t *r, int opt_g) } } +#ifdef USE_ZSTD + if (r->zstd) + { + OUT_BUFFER_SIZE = ZSTD_CStreamOutSize(); + r->zstd_err_len = 1024; + r->outblock.cdata = palloc_safe(r, r->pool, opt.m, "out of memory when allocating buffer for compressed data: %d bytes", opt.m); + r->zstd_error = palloc_safe(r, r->pool, r->zstd_err_len, "out of memory when allocating error buffer for compressed data: %d bytes", r->zstd_err_len); + if (r->is_get) + r->zstd_cctx = ZSTD_createCStream(); + } +#endif + if (r->line_delim_length > 0) { if (NULL == r->line_delim_str || (((int)strlen(r->line_delim_str)) != r->line_delim_length)) @@ -4457,6 +4545,12 @@ static void request_cleanup(request_t *r) { request_shutdown_sock(r); setup_do_close(r); +#ifdef USE_ZSTD + if ( r->zstd && r->is_get ) + { + ZSTD_freeCCtx(r->zstd_cctx); + } +#endif } static void setup_do_close(request_t* r) @@ -4572,3 +4666,64 @@ static void delay_watchdog_timer() { } #endif + +#ifdef USE_ZSTD +/* + * compress_zstd + * It is for compress data in buffer. Return is the length of data after compression. + */ + +static int compress_zstd(request_t *r, block_t *blk, int buflen) +{ + char *buf = blk->data; + int offset = 0; + int cursor = 0; + + if (!r->zstd_cctx) + { + snprintf(r->zstd_error, r->zstd_err_len, "Creating compression context failed, out of memory."); + gprintln(NULL, r->zstd_error); + return -1; + } + + size_t init_result = ZSTD_initCStream(r->zstd_cctx, DEFAULT_COMPRESS_LEVEL); + if (ZSTD_isError(init_result)) + { + snprintf(r->zstd_error, r->zstd_err_len, "Creating compression context initialization failed, error is %s.", ZSTD_getErrorName(init_result)); + gprintln(NULL, r->zstd_error); + return -1; + } + + while(cursor < buflen){ + int in_size = (buflen - cursor) > MAX_FRAME_SIZE ? MAX_FRAME_SIZE : (buflen - cursor); + ZSTD_inBuffer bin = {buf + cursor, in_size, 0}; + int outpos = 0; + while(bin.pos < bin.size){ + ZSTD_outBuffer bout = {blk->cdata + offset, OUT_BUFFER_SIZE - outpos, 0}; + size_t res = ZSTD_compressStream(r->zstd_cctx, &bout, &bin); + + if (ZSTD_isError(res)) + { + snprintf(r->zstd_error, r->zstd_err_len, "Compression failed, error is %s.", ZSTD_getErrorName(res)); + gprintln(NULL, r->zstd_error); + return -1; + } + offset += bout.pos; + outpos = bout.pos; + } + cursor += in_size; + } + + ZSTD_outBuffer output = { r->outblock.cdata + offset, OUT_BUFFER_SIZE, 0 }; + size_t const remainingToFlush = ZSTD_endStream(r->zstd_cctx, &output); /* close frame */ + if (remainingToFlush) + { + snprintf(r->zstd_error, r->zstd_err_len, "Compression failed, error is not fully flushed."); + gprintln(NULL, r->zstd_error); + return -1; + } + offset += output.pos; + + return offset; +} +#endif diff --git a/src/bin/gpfdist/regress/Makefile b/src/bin/gpfdist/regress/Makefile index 0dda3ca8ab..bf7b0a9ad1 100644 --- a/src/bin/gpfdist/regress/Makefile +++ b/src/bin/gpfdist/regress/Makefile @@ -11,6 +11,10 @@ ifeq ($(with_openssl),yes) endif endif +ifeq ($(with_zstd),yes) + REGRESS += gpfdist2_compress +endif + REGRESS_OPTS = --init-file=init_file installcheck: watchdog ipv4v6_ports @@ -22,6 +26,14 @@ ifeq ($(with_openssl),yes) # for verify_gpfdists_cert=off cp data/gpfdist_ssl/certs_matching/root.crt data/gpfdist_ssl/certs_not_matching endif +endif +ifeq ($(with_zstd),yes) + rm -rf data/gpfdist2/lineitem.tbl.long + touch data/gpfdist2/lineitem.tbl.long + for name in `seq 1 1000`; \ + do \ + head -100 data/gpfdist2/lineitem.tbl >> data/gpfdist2/lineitem.tbl.long; \ + done endif $(top_builddir)/src/test/regress/pg_regress --dbname=gpfdist_regression $(REGRESS) $(REGRESS_OPTS) diff --git a/src/bin/gpfdist/regress/input/gpfdist2.source b/src/bin/gpfdist/regress/input/gpfdist2.source index f54c83e34d..065115e0e5 100644 --- a/src/bin/gpfdist/regress/input/gpfdist2.source +++ b/src/bin/gpfdist/regress/input/gpfdist2.source @@ -48,6 +48,9 @@ select * from gpfdist2_start; drop table if exists lineitem; drop external table if exists ext_lineitem; drop external table if exists ext_simple; +DROP EXTERNAL TABLE IF EXISTS gz_multi_chunk_2; +DROP EXTERNAL TABLE IF EXISTS gz_multi_chunk; +DROP EXTERNAL TABLE IF EXISTS full_csvopt; -- end_ignore -- test 1 using a .gz file diff --git a/src/bin/gpfdist/regress/input/gpfdist2_compress.source b/src/bin/gpfdist/regress/input/gpfdist2_compress.source new file mode 100644 index 0000000000..8c249ca6d7 --- /dev/null +++ b/src/bin/gpfdist/regress/input/gpfdist2_compress.source @@ -0,0 +1,250 @@ +-- +-- GPFDIST test cases set 2. This test set is moved from cdbunit. +-- +drop table if exists REG_REGION; +set optimizer_print_missing_stats = off; +CREATE TABLE REG_REGION (R_REGIONKEY INT, R_NAME CHAR(25), R_COMMENT VARCHAR(152)) DISTRIBUTED BY (R_REGIONKEY); +-- start_ignore +-- end_ignore +-- -------------------------------------- +-- 'gpfdist' protocol +-- -------------------------------------- +DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start; +DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_stop; +CREATE EXTERNAL WEB TABLE gpfdist2_start (x text) +execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress </dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 127.0.0.1:7070 >/dev/null 2>&1 && break; sleep 1; done; echo "starting...") ' +on SEGMENT 0 +FORMAT 'text' (delimiter '|'); + +CREATE EXTERNAL WEB TABLE gpfdist2_stop (x text) +execute E'(ps -A -o pid,comm |grep [g]pfdist |grep -v postgres: |awk \'{print $1;}\' |xargs kill) > /dev/null 2>&1; echo "stopping..."' +on SEGMENT 0 +FORMAT 'text' (delimiter '|'); + +-- start_ignore +select * from gpfdist2_stop; +select * from gpfdist2_start; +-- end_ignore + +--- test 1 using a little file + +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; +DROP EXTERNAL TABLE ext_lineitem; + +-- test 2 use a bigger file. + +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; +DROP EXTERNAL TABLE ext_lineitem; + +-- test 3 line too long with defaults + +CREATE EXTERNAL TABLE ext_test ( + id text, + stuff text + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/longline.txt' +) +FORMAT 'text' +( + DELIMITER AS ',' +) LOG ERRORS SEGMENT REJECT LIMIT 2 +; +SELECT count(*) FROM ext_test; +DROP EXTERNAL TABLE ext_test; + +--test 4 using csv data +CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION ('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' (NEWLINE 'CRLF'); +SELECT count(*) FROM ext_crlf_with_lf_column; +DROP EXTERNAL TABLE ext_crlf_with_lf_column; + +-- test 5 use two urls. +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long', + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; +DROP EXTERNAL TABLE ext_lineitem; + +-- start_ignore +select * from gpfdist2_stop; +-- end_ignore + + +--test bigger buffer start +DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start; + +CREATE EXTERNAL WEB TABLE gpfdist2_start (x text) +execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress -m 1200000 </dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 127.0.0.1:7070 >/dev/null 2>&1 && break; sleep 1; done; echo "starting...") ' +on SEGMENT 0 +FORMAT 'text' (delimiter '|'); + +-- start_ignore +select * from gpfdist2_start; +-- end_ignore + +-- test 1 using a little file + +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; +DROP EXTERNAL TABLE ext_lineitem; + +-- test 2 use a bigger file. + +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; +DROP EXTERNAL TABLE ext_lineitem; + +-- test 3 line too long with defaults + +CREATE EXTERNAL TABLE ext_test ( + id text, + stuff text + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/longline.txt' +) +FORMAT 'text' +( + DELIMITER AS ',' +) LOG ERRORS SEGMENT REJECT LIMIT 2 +; +SELECT count(*) FROM ext_test; +DROP EXTERNAL TABLE ext_test; + +--test 4 using csv data +CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION ('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' (NEWLINE 'CRLF'); +SELECT count(*) FROM ext_crlf_with_lf_column; +DROP EXTERNAL TABLE ext_crlf_with_lf_column; \ No newline at end of file diff --git a/src/bin/gpfdist/regress/output/gpfdist2_compress.source b/src/bin/gpfdist/regress/output/gpfdist2_compress.source new file mode 100644 index 0000000000..16d9a0d7cb --- /dev/null +++ b/src/bin/gpfdist/regress/output/gpfdist2_compress.source @@ -0,0 +1,290 @@ +-- +-- GPFDIST test cases set 2. This test set is moved from cdbunit. +-- +drop table if exists REG_REGION; +set optimizer_print_missing_stats = off; +CREATE TABLE REG_REGION (R_REGIONKEY INT, R_NAME CHAR(25), R_COMMENT VARCHAR(152)) DISTRIBUTED BY (R_REGIONKEY); +-- start_ignore +-- end_ignore +-- -------------------------------------- +-- 'gpfdist' protocol +-- -------------------------------------- +DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start; +DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_stop; +CREATE EXTERNAL WEB TABLE gpfdist2_start (x text) +execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress </dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 127.0.0.1:7070 >/dev/null 2>&1 && break; sleep 1; done; echo "starting...") ' +on SEGMENT 0 +FORMAT 'text' (delimiter '|'); +CREATE EXTERNAL WEB TABLE gpfdist2_stop (x text) +execute E'(ps -A -o pid,comm |grep [g]pfdist |grep -v postgres: |awk \'{print $1;}\' |xargs kill) > /dev/null 2>&1; echo "stopping..."' +on SEGMENT 0 +FORMAT 'text' (delimiter '|'); +-- start_ignore +select * from gpfdist2_stop; + x +------------- + stopping... +(1 row) + +select * from gpfdist2_start; + x +------------- + starting... +(1 row) + +-- end_ignore +--- test 1 using a little file +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; + count +------- + 256 +(1 row) + +DROP EXTERNAL TABLE ext_lineitem; +-- test 2 use a bigger file. +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; + count +-------- + 100000 +(1 row) + +DROP EXTERNAL TABLE ext_lineitem; +-- test 3 line too long with defaults +CREATE EXTERNAL TABLE ext_test ( + id text, + stuff text + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/longline.txt' +) +FORMAT 'text' +( + DELIMITER AS ',' +) LOG ERRORS SEGMENT REJECT LIMIT 2 +; +SELECT count(*) FROM ext_test; +ERROR: gpfdist error - line too long in file @abs_srcdir@/data/gpfdist2/longline.txt near (0 bytes) (seg1 slice1 127.0.0.1:7001 pid=30337) +CONTEXT: External table ext_test, line 1 of file gpfdist://@hostname@:7070/gpfdist2/longline.txt +DROP EXTERNAL TABLE ext_test; +--test 4 using csv data +CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION ('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' (NEWLINE 'CRLF'); +SELECT count(*) FROM ext_crlf_with_lf_column; + count +------- + 10367 +(1 row) + +DROP EXTERNAL TABLE ext_crlf_with_lf_column; +-- test 5 use two urls. +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long', + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; + count +-------- + 100256 +(1 row) + +DROP EXTERNAL TABLE ext_lineitem; +-- start_ignore +select * from gpfdist2_stop; + x +------------- + stopping... +(1 row) + +-- end_ignore +--test bigger buffer start +DROP EXTERNAL WEB TABLE IF EXISTS gpfdist2_start; +CREATE EXTERNAL WEB TABLE gpfdist2_start (x text) +execute E'((@bindir@/gpfdist -p 7070 -d @abs_srcdir@/data --compress -m 1200000 </dev/null >/dev/null 2>&1 &); for i in `seq 1 30`; do curl 127.0.0.1:7070 >/dev/null 2>&1 && break; sleep 1; done; echo "starting...") ' +on SEGMENT 0 +FORMAT 'text' (delimiter '|'); +-- start_ignore +select * from gpfdist2_start; + x +------------- + starting... +(1 row) + +-- end_ignore +-- test 1 using a little file +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; + count +------- + 256 +(1 row) + +DROP EXTERNAL TABLE ext_lineitem; +-- test 2 use a bigger file. +CREATE EXTERNAL TABLE ext_lineitem ( + L_ORDERKEY INT8, + L_PARTKEY INTEGER, + L_SUPPKEY INTEGER, + L_LINENUMBER integer, + L_QUANTITY decimal, + L_EXTENDEDPRICE decimal, + L_DISCOUNT decimal, + L_TAX decimal, + L_RETURNFLAG CHAR(1), + L_LINESTATUS CHAR(1), + L_SHIPDATE date, + L_COMMITDATE date, + L_RECEIPTDATE date, + L_SHIPINSTRUCT CHAR(25), + L_SHIPMODE CHAR(10), + L_COMMENT VARCHAR(44) + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/lineitem.tbl.long' +) +FORMAT 'text' +( + DELIMITER AS '|' +) +; +SELECT count(*) FROM ext_lineitem; + count +-------- + 100000 +(1 row) + +DROP EXTERNAL TABLE ext_lineitem; +-- test 3 line too long with defaults +CREATE EXTERNAL TABLE ext_test ( + id text, + stuff text + ) +LOCATION +( + 'gpfdist://@hostname@:7070/gpfdist2/longline.txt' +) +FORMAT 'text' +( + DELIMITER AS ',' +) LOG ERRORS SEGMENT REJECT LIMIT 2 +; +SELECT count(*) FROM ext_test; + count +------- + 1 +(1 row) + +DROP EXTERNAL TABLE ext_test; +--test 4 using csv data +CREATE EXTERNAL TABLE ext_crlf_with_lf_column(c1 int, c2 text) LOCATION ('gpfdist://@hostname@:7070/gpfdist2/crlf_with_lf_column.csv') FORMAT 'csv' (NEWLINE 'CRLF'); +SELECT count(*) FROM ext_crlf_with_lf_column; + count +------- + 10367 +(1 row) + +DROP EXTERNAL TABLE ext_crlf_with_lf_column; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
