Changeset: 54e72812a957 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=54e72812a957
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        clients/mapilib/mapi.h
        monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:

Allow mclient to set compression method using --compression=[method] flag.


diffs (185 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2954,7 +2954,7 @@ usage(const char *prog, int xit)
        fprintf(stderr, " -E charset  | --encoding=charset specify encoding 
(character set) of the terminal\n");
 #endif
        fprintf(stderr, " -f kind     | --format=kind      specify output 
format {csv,tab,raw,sql,xml}\n");
-
+       fprintf(stderr, " -C version  | --compression=type specify compression 
method {snappy,lz4}\n");
        fprintf(stderr, " -P version  | --protocol=version specify protocol 
version {prot9,prot10,prot10compressed}\n");
        fprintf(stderr, " -B size     | --blocksize=size   specify protocol 
block size (>= %d)\n", BLOCK);
 
@@ -2996,6 +2996,7 @@ main(int argc, char **argv)
        char *dbname = NULL;
        char *output = NULL;    /* output format as string */
        char *protocol = NULL;
+       char *compression = NULL;
        size_t blocksize = 0;
        FILE *fp = NULL;
        int trace = 0;
@@ -3022,7 +3023,7 @@ main(int argc, char **argv)
                {"format", 1, 0, 'f'},
                {"protocol", 1, 0, 'P'},
                {"blocksize", 1, 0, 'B'},
-
+               {"compression", 1, 0, 'C'},
                {"help", 0, 0, '?'},
                {"history", 0, 0, 'H'},
                {"host", 1, 0, 'h'},
@@ -3162,6 +3163,12 @@ main(int argc, char **argv)
                                free(protocol);
                        protocol = strdup(optarg);
                        break;
+               case 'C':
+                       assert(optarg);
+                       if (compression != NULL)
+                               free(compression);
+                       compression = strdup(optarg);
+                       break;
                case 'B':
                        assert(optarg);
                        blocksize = (size_t) atol(optarg);
@@ -3326,6 +3333,11 @@ main(int argc, char **argv)
                        fprintf(stderr, "%s\n", mapi_error_str(mid));
                }
        }
+       if (compression) {
+               if (mapi_set_compression(mid, compression) != 0) {
+                       fprintf(stderr, "%s\n", mapi_error_str(mid));
+               }
+       }
 
        if (mid && mapi_error(mid) == MOK)
                mapi_reconnect(mid);    /* actually, initial connect */
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -906,6 +906,7 @@ struct MapiStruct {
        int languageId;
        char *motd;             /* welcome message from server */
        protocol_version protocol;
+       compression_method comp;
        size_t blocksize;
 
        int trace;              /* Trace Mapi interaction */
@@ -1895,6 +1896,7 @@ mapi_new(void)
        mid->username = NULL;
        mid->password = NULL;
 
+       mid->comp = COMPRESSION_SNAPPY;
        mid->protocol = protauto;
        mid->blocksize = 128 * BLOCK; // 1 MB
 
@@ -2223,6 +2225,7 @@ mapi_reconnect(Mapi mid)
        char *protover;
        char *rest;
        protocol_version prot_version = prot9;
+       compression_method comp = COMPRESSION_NONE;
 
        if (mid->connected)
                close_connection(mid);
@@ -2638,6 +2641,7 @@ mapi_reconnect(Mapi mid)
 #ifdef HAVE_LIBSNAPPY
                if (strstr(hashes, "PROT10COMPR")) {
                        // both server and client support compressed protocol 
10; use compressed version 
+                       comp = mid->comp;
                        if (mid->protocol == protauto) {
                                prot_version = prot10compressed;
                        } else {
@@ -2760,7 +2764,7 @@ mapi_reconnect(Mapi mid)
                        if (prot_version == prot10 || prot_version == 
prot10compressed) {
                                // if we are using protocol 10, we have to send 
either PROT10/PROT10COMPRESSED to the server
                                // so the server knows which protocol to use
-                               retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:%s:%zu:\n",
+                               retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:%s:%s:%zu:\n",
        #ifdef WORDS_BIGENDIAN
                                     "BIG",
        #else
@@ -2769,6 +2773,7 @@ mapi_reconnect(Mapi mid)
                                     mid->username, hash, mid->language,
                                     mid->database == NULL ? "" : mid->database,
                                     prot_version == prot10 ? "PROT10" : 
"PROT10COMPR",
+                                    comp == COMPRESSION_SNAPPY ? "SNAPPY" : 
(comp == COMPRESSION_LZ4 ? "LZ4" : ""),
                                    mid->blocksize);
                        } else {
                                retval = snprintf(buf, BLOCK, 
"%s:%s:%s:%s:%s:\n",
@@ -2816,8 +2821,8 @@ mapi_reconnect(Mapi mid)
 
                if (prot_version == prot10compressed) {
 #ifdef HAVE_LIBSNAPPY
-                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_LZ4);
-                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_LZ4);
+                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, comp);
+                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, comp);
 #else
                        assert(0);
 #endif
@@ -5971,6 +5976,21 @@ MapiMsg mapi_set_protocol(Mapi mid, cons
        return 0;
 }
 
+
+MapiMsg mapi_set_compression(Mapi mid, const char* compression) {
+       if (strcasecmp(compression, "snappy") == 0) {
+               mid->comp = COMPRESSION_SNAPPY;
+       }
+       else if (strcasecmp(compression, "lz4") == 0) {
+               mid->comp = COMPRESSION_LZ4;
+       } else {
+               mapi_setError(mid, "invalid compression name", 
"mapi_set_compression", MERROR);
+               return -1;
+       }
+
+       return 0;
+}
+
 void mapi_set_blocksize(Mapi mid, size_t blocksize) {
        if (blocksize >= BLOCK) {
                mid->blocksize = blocksize;
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -234,6 +234,7 @@ mapi_export char *mapi_unquote(char *msg
 mapi_export MapiHdl mapi_get_active(Mapi mid);
 
 mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot);
+mapi_export MapiMsg mapi_set_compression(Mapi mid, const char* compression);
 mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize);
 
 #ifdef _MSC_VER
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -173,6 +173,7 @@ doChallenge(void *data)
        if (strstr(buf, "PROT10")) {
                char *buflenstrend, *buflenstr = strstr(buf, "PROT10");
                buflenstr = strchr(buflenstr, ':') + 1;
+               buflenstr = strchr(buflenstr, ':') + 1;
                if (!buflenstr) {
                        mnstr_printf(fdout, "!buffer size needs to be set and 
bigger than %d\n", BLOCK);
                        close_stream(fdin);
@@ -200,12 +201,21 @@ doChallenge(void *data)
                        fdin = block_stream2(bs_stream(fdin), buflen, 
COMPRESSION_NONE);
                        fdout = block_stream2(bs_stream(fdout), buflen, 
COMPRESSION_NONE);
                } else {
+                       compression_method comp;
+                       if (strstr(buf, "SNAPPY")) {
+                               comp = COMPRESSION_SNAPPY;
+                       } else if (strstr(buf, "LZ4")) {
+                               comp = COMPRESSION_LZ4;
+                       } else {
+                               fprintf(stderr, "Unrecognized compression 
type!\n");
+                               comp = COMPRESSION_SNAPPY;
+                       }
 #ifdef HAVE_LIBSNAPPY
                        // client requests switch to protocol 10
                        protocol = prot10compressed;
                        // compressed protocol 10
-                       fdin = block_stream2(bs_stream(fdin), buflen, 
COMPRESSION_LZ4);
-                       fdout = block_stream2(bs_stream(fdout), buflen, 
COMPRESSION_LZ4);
+                       fdin = block_stream2(bs_stream(fdin), buflen, comp);
+                       fdout = block_stream2(bs_stream(fdout), buflen, comp);
 #else
                        // client requested compressed protocol, but server 
does not support it
                        mnstr_printf(fdout, "!server does not support 
compressed protocol\n");
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to