Changeset: 1efcfa2e1b7d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1efcfa2e1b7d
Modified Files:
clients/mapilib/mapi.c
common/stream/Makefile.ag
common/stream/stream.c
common/stream/stream.h
configure.ag
monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:
Add support for LZ4 compression.
diffs (truncated from 400 to 300 lines):
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2816,8 +2816,8 @@ mapi_reconnect(Mapi mid)
if (prot_version == prot10compressed) {
#ifdef HAVE_LIBSNAPPY
- mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, COMPRESSION_SNAPPY);
- mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, COMPRESSION_SNAPPY);
+ mid->to = block_stream2(bs_stream(mid->to),
mid->blocksize, COMPRESSION_LZ4);
+ mid->from = block_stream2(bs_stream(mid->from),
mid->blocksize, COMPRESSION_LZ4);
#else
assert(0);
#endif
@@ -4003,11 +4003,11 @@ static char* mapi_convert_varchar(struct
static char* itoa(int i, char b[]){
char const digit[] = "0123456789";
char* p = b;
+ int shifter = i;
if(i<0){
*p++ = '-';
i *= -1;
}
- int shifter = i;
do{ //Move to where representation ends
++p;
shifter = shifter/10;
diff --git a/common/stream/Makefile.ag b/common/stream/Makefile.ag
--- a/common/stream/Makefile.ag
+++ b/common/stream/Makefile.ag
@@ -11,6 +11,7 @@ MTSAFE
INCLUDES = $(zlib_CFLAGS) \
$(BZ_CFLAGS) \
$(snappy_CFLAGS) \
+ $(lz4_CFLAGS) \
$(liblzma_CFLAGS) \
$(openssl_CFLAGS) \
$(curl_CFLAGS)
@@ -22,6 +23,7 @@ lib_stream = {
$(zlib_LIBS) \
$(BZ_LIBS) \
$(snappy_LIBS) \
+ $(lz4_LIBS) \
$(liblzma_LIBS) \
$(openssl_LIBS) \
$(curl_LIBS) \
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -96,6 +96,9 @@
#ifdef HAVE_LIBSNAPPY
#include <snappy-c.h> // C forever
#endif
+#ifdef HAVE_LIBLZ4
+#include <lz4.h>
+#endif
#ifdef HAVE_ICONV
#ifdef HAVE_ICONV_H
@@ -3985,11 +3988,98 @@ typedef struct bs2 {
* size-short */
} bs2;
+
+static ssize_t
+compress_stream_data(bs2 *s) {
+ assert(s->comp != COMPRESSION_NONE);
+ if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+ size_t compressed_length = s->compbufsiz;
+ snappy_status ret;
+ if ((ret = snappy_compress(s->buf, s->nr, s->compbuf,
&compressed_length)) != SNAPPY_OK) {
+ s->s->errnr = (int) ret;
+ return -1;
+ }
+ return compressed_length;
+#else
+ assert(0);
+ return -1;
+#endif
+ } else if (s->comp == COMPRESSION_LZ4) {
+#ifdef HAVE_LIBLZ4
+ int compressed_length = (int) s->compbufsiz;
+ if ((compressed_length = LZ4_compress_fast(s->buf, s->compbuf,
s->nr, compressed_length, 1)) == 0) {
+ s->s->errnr = -1;
+ return -1;
+ }
+ return compressed_length;
+#else
+ assert(0);
+ return -1;
+#endif
+ }
+ return -1;
+}
+
+
+static ssize_t
+decompress_stream_data(bs2 *s) {
+ assert(s->comp != COMPRESSION_NONE);
+ if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+ snappy_status ret;
+ size_t uncompressed_length = s->bufsiz;
+ if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf,
&uncompressed_length)) != SNAPPY_OK) {
+ s->s->errnr = (int) ret;
+ return -1;
+ }
+ return (ssize_t) uncompressed_length;
+#else
+ assert(0);
+ return -1;
+#endif
+ } else if (s->comp == COMPRESSION_LZ4) {
+ int uncompressed_length = (int) s->bufsiz;
+#ifdef HAVE_LIBLZ4
+ if ((uncompressed_length = LZ4_decompress_safe(s->compbuf,
s->buf, s->itotal, uncompressed_length)) <= 0) {
+ s->s->errnr = uncompressed_length;
+ return -1;
+ }
+ return uncompressed_length;
+#else
+ assert(0);
+ return -1;
+#endif
+ }
+ return -1;
+}
+
+static ssize_t
+compression_size_bound(bs2 *s) {
+ if (s->comp == COMPRESSION_NONE) {
+ return 0;
+ } else if (s->comp == COMPRESSION_SNAPPY) {
+#ifndef HAVE_LIBSNAPPY
+ return -1;
+#else
+ return snappy_max_compressed_length(s->bufsiz);
+#endif
+ } else if (s->comp == COMPRESSION_LZ4) {
+#ifndef HAVE_LIBLZ4
+ return -1;
+#else
+ return LZ4_compressBound(s->bufsiz);
+#endif
+ }
+ return -1;
+}
+
static bs2 *
bs2_create(stream *s, size_t bufsiz, compression_method comp)
{
/* should be a binary stream */
bs2 *ns;
+ ssize_t compress_bound = 0;
if ((ns = malloc(sizeof(*ns) + bufsiz)) == NULL)
return NULL;
@@ -3999,18 +4089,18 @@ bs2_create(stream *s, size_t bufsiz, com
ns->bufsiz = bufsiz;
ns->comp = comp;
ns->compbuf = NULL;
- if (comp == COMPRESSION_SNAPPY) {
-#ifndef HAVE_LIBSNAPPY
- free(ns);
- return NULL;
-#else
- ns->compbufsiz = snappy_max_compressed_length(ns->bufsiz);
+
+ compress_bound = compression_size_bound(ns);
+ if (compress_bound > 0) {
+ ns->compbufsiz = compress_bound;
ns->compbuf = malloc(ns->compbufsiz);
if (!ns->compbuf) {
free(ns);
return NULL;
}
-#endif
+ } else if (compress_bound < 0) {
+ free(ns);
+ return NULL;
}
return ns;
}
@@ -4069,23 +4159,17 @@ bs2_write(stream *ss, const void *buf, s
blksize = s->nr;
writebuf = s->buf;
- if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
- snappy_status ret;
- size_t compressed_length = s->compbufsiz;
- if ((ret = snappy_compress(s->buf, s->nr,
s->compbuf, &compressed_length)) != SNAPPY_OK) {
- ss->errnr = (int) ret;
+ if (s->comp != COMPRESSION_NONE) {
+ ssize_t compressed_length =
compress_stream_data(s);
+ if (compressed_length < 0) {
return -1;
}
writebuf = s->compbuf;
blksize = (lng) compressed_length;
writelen = compressed_length;
-#else
- assert(0);
- return -1;
-#endif
}
+
/* the last bit tells whether a flush is in there, it's
not
* at this moment, so shift it to the left */
blksize <<= 1;
@@ -4140,21 +4224,14 @@ bs2_flush(stream *ss)
blksize = s->nr;
writebuf = s->buf;
- if (s->nr > 0 && s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
- size_t compressed_length = s->compbufsiz;
- snappy_status ret;
- if ((ret = snappy_compress(s->buf, s->nr, s->compbuf,
&compressed_length)) != SNAPPY_OK) {
- ss->errnr = (int) ret;
+ if (s->nr > 0 && s->comp != COMPRESSION_NONE) {
+ ssize_t compressed_length = compress_stream_data(s);
+ if (compressed_length < 0) {
return -1;
}
writebuf = s->compbuf;
blksize = (lng) compressed_length;
writelen = compressed_length;
-#else
- assert(0);
- return -1;
-#endif
}
/* indicate that this is the last buffer of a block by
@@ -4235,13 +4312,12 @@ bs2_read(stream *ss, void *buf, size_t e
if (s->itotal > 0) {
// read everything into the comp buf
- size_t uncompressed_length = s->bufsiz;
+ ssize_t uncompressed_length = s->bufsiz;
size_t m = 0;
char *buf = s->buf;
if (s->comp != COMPRESSION_NONE) {
buf = s->compbuf;
}
- snappy_status ret;
while (m < s->itotal) {
ssize_t bytes_read = 0;
@@ -4252,16 +4328,12 @@ bs2_read(stream *ss, void *buf, size_t e
}
m += bytes_read;
}
- if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
- if ((ret = snappy_uncompress(s->compbuf,
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
- ss->errnr = (int) ret;
- return -1;
- }
-#else
- assert(0);
- return -1;
-#endif
+ if (s->comp != COMPRESSION_NONE) {
+ uncompressed_length =
decompress_stream_data(s);
+ if (uncompressed_length < 0) {
+ ss->errnr = (int) uncompressed_length;
+ return -1;
+ }
} else {
uncompressed_length = m;
}
@@ -4317,13 +4389,12 @@ bs2_read(stream *ss, void *buf, size_t e
if (s->itotal > 0) {
// read everything into the comp buf
- size_t uncompressed_length = s->bufsiz;
+ ssize_t uncompressed_length = s->bufsiz;
size_t m = 0;
char *buf = s->buf;
if (s->comp != COMPRESSION_NONE) {
buf = s->compbuf;
}
- snappy_status ret;
while (m < s->itotal) {
ssize_t bytes_read = 0;
@@ -4334,16 +4405,12 @@ bs2_read(stream *ss, void *buf, size_t e
}
m += bytes_read;
}
- if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
- if ((ret =
snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) !=
SNAPPY_OK) {
- ss->errnr = (int) ret;
- return -1;
- }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list