Changeset: 54e72812a957 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=54e72812a957
Modified Files:
clients/mapiclient/mclient.c
clients/mapilib/mapi.c
clients/mapilib/mapi.h
monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:
Allow mclient to set compression method using --compression=[method] flag.
diffs (185 lines):
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2954,7 +2954,7 @@ usage(const char *prog, int xit)
fprintf(stderr, " -E charset | --encoding=charset specify encoding
(character set) of the terminal\n");
#endif
fprintf(stderr, " -f kind | --format=kind specify output
format {csv,tab,raw,sql,xml}\n");
-
+ fprintf(stderr, " -C version | --compression=type specify compression
method {snappy,lz4}\n");
fprintf(stderr, " -P version | --protocol=version specify protocol
version {prot9,prot10,prot10compressed}\n");
fprintf(stderr, " -B size | --blocksize=size specify protocol
block size (>= %d)\n", BLOCK);
@@ -2996,6 +2996,7 @@ main(int argc, char **argv)
char *dbname = NULL;
char *output = NULL; /* output format as string */
char *protocol = NULL;
+ char *compression = NULL;
size_t blocksize = 0;
FILE *fp = NULL;
int trace = 0;
@@ -3022,7 +3023,7 @@ main(int argc, char **argv)
{"format", 1, 0, 'f'},
{"protocol", 1, 0, 'P'},
{"blocksize", 1, 0, 'B'},
-
+ {"compression", 1, 0, 'C'},
{"help", 0, 0, '?'},
{"history", 0, 0, 'H'},
{"host", 1, 0, 'h'},
@@ -3162,6 +3163,12 @@ main(int argc, char **argv)
free(protocol);
protocol = strdup(optarg);
break;
+ case 'C':
+ assert(optarg);
+ if (compression != NULL)
+ free(compression);
+ compression = strdup(optarg);
+ break;
case 'B':
assert(optarg);
blocksize = (size_t) atol(optarg);
@@ -3326,6 +3333,11 @@ main(int argc, char **argv)
fprintf(stderr, "%s\n", mapi_error_str(mid));
}
}
+ if (compression) {
+ if (mapi_set_compression(mid, compression) != 0) {
+ fprintf(stderr, "%s\n", mapi_error_str(mid));
+ }
+ }
if (mid && mapi_error(mid) == MOK)
mapi_reconnect(mid); /* actually, initial connect */
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -906,6 +906,7 @@ struct MapiStruct {
int languageId;
char *motd; /* welcome message from server */
protocol_version protocol;
+ compression_method comp;
size_t blocksize;
int trace; /* Trace Mapi interaction */
@@ -1895,6 +1896,7 @@ mapi_new(void)
mid->username = NULL;
mid->password = NULL;
+ mid->comp = COMPRESSION_SNAPPY;
mid->protocol = protauto;
mid->blocksize = 128 * BLOCK; // 1 MB
@@ -2223,6 +2225,7 @@ mapi_reconnect(Mapi mid)
char *protover;
char *rest;
protocol_version prot_version = prot9;
+ compression_method comp = COMPRESSION_NONE;
if (mid->connected)
close_connection(mid);
@@ -2638,6 +2641,7 @@ mapi_reconnect(Mapi mid)
#ifdef HAVE_LIBSNAPPY
if (strstr(hashes, "PROT10COMPR")) {
// both server and client support compressed protocol
10; use compressed version
+ comp = mid->comp;
if (mid->protocol == protauto) {
prot_version = prot10compressed;
} else {
@@ -2760,7 +2764,7 @@ mapi_reconnect(Mapi mid)
if (prot_version == prot10 || prot_version ==
prot10compressed) {
// if we are using protocol 10, we have to send
either PROT10/PROT10COMPRESSED to the server
// so the server knows which protocol to use
- retval = snprintf(buf, BLOCK,
"%s:%s:%s:%s:%s:%s:%zu:\n",
+ retval = snprintf(buf, BLOCK,
"%s:%s:%s:%s:%s:%s:%s:%zu:\n",
#ifdef WORDS_BIGENDIAN
"BIG",
#else
@@ -2769,6 +2773,7 @@ mapi_reconnect(Mapi mid)
mid->username, hash, mid->language,
mid->database == NULL ? "" : mid->database,
prot_version == prot10 ? "PROT10" :
"PROT10COMPR",
+ comp == COMPRESSION_SNAPPY ? "SNAPPY" :
(comp == COMPRESSION_LZ4 ? "LZ4" : ""),
mid->blocksize);
} else {
retval = snprintf(buf, BLOCK,
"%s:%s:%s:%s:%s:\n",
@@ -2816,8 +2821,8 @@ mapi_reconnect(Mapi mid)
if (prot_version == prot10compressed) {
#ifdef HAVE_LIBSNAPPY
- mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, COMPRESSION_LZ4);
- mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, COMPRESSION_LZ4);
+ mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, comp);
+ mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, comp);
#else
assert(0);
#endif
@@ -5971,6 +5976,21 @@ MapiMsg mapi_set_protocol(Mapi mid, cons
return 0;
}
+
+MapiMsg mapi_set_compression(Mapi mid, const char* compression) {
+ if (strcasecmp(compression, "snappy") == 0) {
+ mid->comp = COMPRESSION_SNAPPY;
+ }
+ else if (strcasecmp(compression, "lz4") == 0) {
+ mid->comp = COMPRESSION_LZ4;
+ } else {
+ mapi_setError(mid, "invalid compression name",
"mapi_set_compression", MERROR);
+ return -1;
+ }
+
+ return 0;
+}
+
void mapi_set_blocksize(Mapi mid, size_t blocksize) {
if (blocksize >= BLOCK) {
mid->blocksize = blocksize;
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -234,6 +234,7 @@ mapi_export char *mapi_unquote(char *msg
mapi_export MapiHdl mapi_get_active(Mapi mid);
mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot);
+mapi_export MapiMsg mapi_set_compression(Mapi mid, const char* compression);
mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize);
#ifdef _MSC_VER
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -173,6 +173,7 @@ doChallenge(void *data)
if (strstr(buf, "PROT10")) {
char *buflenstrend, *buflenstr = strstr(buf, "PROT10");
buflenstr = strchr(buflenstr, ':') + 1;
+ buflenstr = strchr(buflenstr, ':') + 1;
if (!buflenstr) {
mnstr_printf(fdout, "!buffer size needs to be set and
bigger than %d\n", BLOCK);
close_stream(fdin);
@@ -200,12 +201,21 @@ doChallenge(void *data)
fdin = block_stream2(bs_stream(fdin), buflen,
COMPRESSION_NONE);
fdout = block_stream2(bs_stream(fdout), buflen,
COMPRESSION_NONE);
} else {
+ compression_method comp;
+ if (strstr(buf, "SNAPPY")) {
+ comp = COMPRESSION_SNAPPY;
+ } else if (strstr(buf, "LZ4")) {
+ comp = COMPRESSION_LZ4;
+ } else {
+ fprintf(stderr, "Unrecognized compression
type!\n");
+ comp = COMPRESSION_SNAPPY;
+ }
#ifdef HAVE_LIBSNAPPY
// client requests switch to protocol 10
protocol = prot10compressed;
// compressed protocol 10
- fdin = block_stream2(bs_stream(fdin), buflen,
COMPRESSION_LZ4);
- fdout = block_stream2(bs_stream(fdout), buflen,
COMPRESSION_LZ4);
+ fdin = block_stream2(bs_stream(fdin), buflen, comp);
+ fdout = block_stream2(bs_stream(fdout), buflen, comp);
#else
// client requested compressed protocol, but server
does not support it
mnstr_printf(fdout, "!server does not support
compressed protocol\n");
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list