Changeset: faf73bcc6423 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=faf73bcc6423
Modified Files:
clients/mapiclient/mclient.c
clients/mapilib/mapi.c
clients/mapilib/mapi.h
common/stream/stream.c
common/stream/stream.h
common/utils/mcrypt.c
monetdb5/modules/mal/mal_mapi.c
sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:
Add extra --colcomp flag to mserver to specify column-type compression used (if
any).
diffs (truncated from 356 to 300 lines):
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -2957,6 +2957,7 @@ usage(const char *prog, int xit)
fprintf(stderr, " -C version | --compression=type specify compression
method {snappy,lz4}\n");
fprintf(stderr, " -P version | --protocol=version specify protocol
version {prot9,prot10,prot10compressed}\n");
fprintf(stderr, " -B size | --blocksize=size specify protocol
block size (>= %d)\n", BLOCK);
+ fprintf(stderr, " -c colcomp | --colcomp=type specify column
compression type {none,pfor}");
fprintf(stderr, " -H | --history load/save cmdline
history (default off)\n");
fprintf(stderr, " -i | --interactive[=tm] interpret `\\'
commands on stdin, use time formatting {ms,s,m}\n");
@@ -2997,6 +2998,7 @@ main(int argc, char **argv)
char *output = NULL; /* output format as string */
char *protocol = NULL;
char *compression = NULL;
+ char *colcomp = NULL;
size_t blocksize = 0;
FILE *fp = NULL;
int trace = 0;
@@ -3024,6 +3026,7 @@ main(int argc, char **argv)
{"protocol", 1, 0, 'P'},
{"blocksize", 1, 0, 'B'},
{"compression", 1, 0, 'C'},
+ {"colcomp", 1, 0, 'c'},
{"help", 0, 0, '?'},
{"history", 0, 0, 'H'},
{"host", 1, 0, 'h'},
@@ -3169,6 +3172,12 @@ main(int argc, char **argv)
free(compression);
compression = strdup(optarg);
break;
+ case 'c':
+ assert(optarg);
+ if (colcomp != NULL)
+ free(colcomp);
+ colcomp = strdup(optarg);
+ break;
case 'B':
assert(optarg);
blocksize = (size_t) atol(optarg);
@@ -3331,11 +3340,20 @@ main(int argc, char **argv)
if (protocol) {
if (mapi_set_protocol(mid, protocol) != 0) {
fprintf(stderr, "%s\n", mapi_error_str(mid));
+ exit(1);
}
}
if (compression) {
if (mapi_set_compression(mid, compression) != 0) {
fprintf(stderr, "%s\n", mapi_error_str(mid));
+ exit(1);
+ }
+ }
+
+ if (colcomp) {
+ if (mapi_set_column_compression(mid, colcomp) != 0) {
+ fprintf(stderr, "%s\n", mapi_error_str(mid));
+ exit(1);
}
}
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -911,6 +911,7 @@ struct MapiStruct {
char *motd; /* welcome message from server */
protocol_version protocol;
compression_method comp;
+ column_compression colcomp;
size_t blocksize;
int trace; /* Trace Mapi interaction */
@@ -1902,6 +1903,7 @@ mapi_new(void)
mid->comp = COMPRESSION_SNAPPY;
mid->protocol = protauto;
+ mid->colcomp = COLUMN_COMPRESSION_AUTO;
mid->blocksize = 128 * BLOCK; // 1 MB
mid->cachelimit = 100;
@@ -2641,6 +2643,23 @@ mapi_reconnect(Mapi mid)
*hash = '\0';
rest = hash + 1;
}
+#ifdef HAVE_PFOR
+ if (strstr(hashes, "PFOR")) {
+ if (mid->colcomp == COLUMN_COMPRESSION_AUTO) {
+ mid->colcomp = COLUMN_COMPRESSION_PFOR;
+ }
+ } else if (mid->colcomp == COLUMN_COMPRESSION_PFOR) {
+ mapi_setError(mid, "Client wants PFOR but server does
not support it",
+ "mapi_reconnect", MERROR);
+ close_connection(mid);
+ return mid->error;
+ }
+#else
+ if (mid->colcomp == COLUMN_COMPRESSION_PFOR) {
+ fprintf(stderr, "Client does not support PFOR
compression.\n");
+ exit(1);
+ }
+#endif
#ifdef HAVE_LIBSNAPPY
if (strstr(hashes, "PROT10COMPR")) {
// both server and client support compressed protocol
10; use compressed version
@@ -2767,7 +2786,7 @@ mapi_reconnect(Mapi mid)
if (prot_version == prot10 || prot_version ==
prot10compressed) {
// if we are using protocol 10, we have to send
either PROT10/PROT10COMPRESSED to the server
// so the server knows which protocol to use
- retval = snprintf(buf, BLOCK,
"%s:%s:%s:%s:%s:%s:%s:%zu:\n",
+ retval = snprintf(buf, BLOCK,
"%s:%s:%s:%s:%s:%s:%s%s:%zu:\n",
#ifdef WORDS_BIGENDIAN
"BIG",
#else
@@ -2777,6 +2796,11 @@ mapi_reconnect(Mapi mid)
mid->database == NULL ? "" : mid->database,
prot_version == prot10 ? "PROT10" :
"PROT10COMPR",
comp == COMPRESSION_SNAPPY ? "SNAPPY" :
(comp == COMPRESSION_LZ4 ? "LZ4" : ""),
+#ifdef HAVE_PFOR
+ mid->colcomp == COLUMN_COMPRESSION_PFOR ?
",HAVEPFOR" : "",
+#else
+ "",
+#endif
mid->blocksize);
} else {
retval = snprintf(buf, BLOCK,
"%s:%s:%s:%s:%s:\n",
@@ -2824,14 +2848,14 @@ mapi_reconnect(Mapi mid)
if (prot_version == prot10compressed) {
#ifdef HAVE_LIBSNAPPY
- mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, comp);
- mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, comp);
+ mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, comp, mid->colcomp);
+ mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, comp, mid->colcomp);
#else
assert(0);
#endif
} else {
- mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, COMPRESSION_NONE);
- mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, COMPRESSION_NONE);
+ mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, COMPRESSION_NONE, mid->colcomp);
+ mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, COMPRESSION_NONE, mid->colcomp);
}
// FIXME: this leaks a block stream header
@@ -5593,7 +5617,7 @@ mapi_fetch_row(MapiHdl hdl)
#endif
} else {
#ifdef HAVE_PFOR
- if
(strcasecmp(result->fields[i].columntype, "int") == 0) {
+ if (hdl->mid->colcomp ==
COLUMN_COMPRESSION_PFOR && strcasecmp(result->fields[i].columntype, "int") ==
0) {
lng b = *((lng*) buf);
buf += sizeof(lng);
lng length = *((lng*)(buf));
@@ -6009,7 +6033,8 @@ MapiMsg mapi_set_protocol(Mapi mid, cons
}
-MapiMsg mapi_set_compression(Mapi mid, const char* compression) {
+MapiMsg
+mapi_set_compression(Mapi mid, const char* compression) {
if (strcasecmp(compression, "snappy") == 0) {
mid->comp = COMPRESSION_SNAPPY;
}
@@ -6023,11 +6048,26 @@ MapiMsg mapi_set_compression(Mapi mid, c
return 0;
}
-void mapi_set_blocksize(Mapi mid, size_t blocksize) {
+void
+mapi_set_blocksize(Mapi mid, size_t blocksize) {
if (blocksize >= BLOCK) {
mid->blocksize = blocksize;
}
}
-
-
+MapiMsg
+mapi_set_column_compression(Mapi mid, const char* colcomp) {
+ if (strcasecmp(colcomp, "pfor") == 0) {
+ mid->colcomp = COLUMN_COMPRESSION_PFOR;
+ }
+ else if (strcasecmp(colcomp, "none") == 0) {
+ mid->colcomp = COLUMN_COMPRESSION_NONE;
+ } else {
+ mapi_setError(mid, "invalid column compression type",
"mapi_set_compression", MERROR);
+ return -1;
+ }
+
+ return 0;
+}
+
+
diff --git a/clients/mapilib/mapi.h b/clients/mapilib/mapi.h
--- a/clients/mapilib/mapi.h
+++ b/clients/mapilib/mapi.h
@@ -235,6 +235,7 @@ mapi_export MapiHdl mapi_get_active(Mapi
mapi_export MapiMsg mapi_set_protocol(Mapi mid, const char* prot);
mapi_export MapiMsg mapi_set_compression(Mapi mid, const char* compression);
+mapi_export MapiMsg mapi_set_column_compression(Mapi mid, const char* colcomp);
mapi_export void mapi_set_blocksize(Mapi mid, size_t blocksize);
#ifdef _MSC_VER
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -3982,6 +3982,7 @@ typedef struct bs2 {
size_t bufsiz;
size_t readpos;
compression_method comp;
+ column_compression colcomp;
char *compbuf;
size_t compbufsiz;
char buf[1]; /* the buffered data */
@@ -4460,6 +4461,12 @@ bs2_buffer(stream *ss) {
return b;
}
+column_compression
+bs2_colcomp(stream *ss) {
+ bs2 *s = (bs2 *) ss->stream_data.p;
+ return s->colcomp;
+}
+
int
isa_block_stream(stream *s)
{
@@ -4474,7 +4481,7 @@ isa_fixed_block_stream(stream *s) {
}
stream *
-block_stream2(stream *s, size_t bufsiz, compression_method comp)
+block_stream2(stream *s, size_t bufsiz, compression_method comp,
column_compression colcomp)
{
stream *ns;
bs2 *b;
@@ -4490,6 +4497,7 @@ block_stream2(stream *s, size_t bufsiz,
destroy(ns);
return NULL;
}
+ b->colcomp = colcomp;
/* blocksizes have a fixed little endian byteorder */
#ifdef WORDS_BIGENDIAN
s->byteorder = 3412; /* simply != 1234 */
diff --git a/common/stream/stream.h b/common/stream/stream.h
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -249,11 +249,17 @@ typedef enum {
COMPRESSION_UNKNOWN = 255
} compression_method;
-stream_export stream *block_stream2(stream *s, size_t bufsiz,
compression_method comp);
+typedef enum {
+ COLUMN_COMPRESSION_AUTO = 255,
+ COLUMN_COMPRESSION_NONE = 0,
+ COLUMN_COMPRESSION_PFOR = 1
+} column_compression;
+
+stream_export stream *block_stream2(stream *s, size_t bufsiz,
compression_method comp, column_compression colcomp);
stream_export void* bs2_getbuf(stream *ss);
stream_export void bs2_resetbuf(stream *ss);
stream_export buffer bs2_buffer(stream *s);
-
+column_compression bs2_colcomp(stream *ss);
/* read block of data including the end of block marker */
stream_export ssize_t mnstr_read_block(stream *s, void *buf, size_t elmsize,
size_t cnt);
diff --git a/common/utils/mcrypt.c b/common/utils/mcrypt.c
--- a/common/utils/mcrypt.c
+++ b/common/utils/mcrypt.c
@@ -34,13 +34,16 @@ mcrypt_getHashAlgorithms(void)
* Better/stronger/faster algorithms can be added in the future upon
* desire.
*/
+ return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10"
#ifdef HAVE_LIBSNAPPY
- // the server supports protocol 10 + compression
- return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10,PROT10COMPR");
-#else
- // the server only supports protocol 10
- return strdup("RIPEMD160,SHA256,SHA1,MD5,PROT10");
+// the server supports protocol 10 + compression
+ ",PROT10COMPR"
#endif
+#ifdef HAVE_PFOR
+// the server supports PFOR
+ ",PFOR"
+#endif
+ );
}
/**
diff --git a/monetdb5/modules/mal/mal_mapi.c b/monetdb5/modules/mal/mal_mapi.c
--- a/monetdb5/modules/mal/mal_mapi.c
+++ b/monetdb5/modules/mal/mal_mapi.c
@@ -121,6 +121,7 @@ doChallenge(void *data)
ssize_t len = 0;
protocol_version protocol = prot9;
size_t buflen = BLOCK;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list