Changeset: 1efcfa2e1b7d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1efcfa2e1b7d
Modified Files:
        clients/mapilib/mapi.c
        common/stream/Makefile.ag
        common/stream/stream.c
        common/stream/stream.h
        configure.ag
        monetdb5/modules/mal/mal_mapi.c
Branch: protocol
Log Message:

Add support for LZ4 compression.


diffs (truncated from 400 to 300 lines):

diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -2816,8 +2816,8 @@ mapi_reconnect(Mapi mid)
 
                if (prot_version == prot10compressed) {
 #ifdef HAVE_LIBSNAPPY
-                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_SNAPPY);
-                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_SNAPPY);
+                       mid->to = block_stream2(bs_stream(mid->to), 
mid->blocksize, COMPRESSION_LZ4);
+                       mid->from = block_stream2(bs_stream(mid->from), 
mid->blocksize, COMPRESSION_LZ4);
 #else
                        assert(0);
 #endif
@@ -4003,11 +4003,11 @@ static char* mapi_convert_varchar(struct
 static char* itoa(int i, char b[]){
     char const digit[] = "0123456789";
     char* p = b;
+    int shifter = i;
     if(i<0){
         *p++ = '-';
         i *= -1;
     }
-    int shifter = i;
     do{ //Move to where representation ends
         ++p;
         shifter = shifter/10;
diff --git a/common/stream/Makefile.ag b/common/stream/Makefile.ag
--- a/common/stream/Makefile.ag
+++ b/common/stream/Makefile.ag
@@ -11,6 +11,7 @@ MTSAFE
 INCLUDES = $(zlib_CFLAGS) \
                   $(BZ_CFLAGS) \
                   $(snappy_CFLAGS) \
+                  $(lz4_CFLAGS) \
                   $(liblzma_CFLAGS) \
                   $(openssl_CFLAGS) \
                   $(curl_CFLAGS)
@@ -22,6 +23,7 @@ lib_stream  =  {
                   $(zlib_LIBS) \
                   $(BZ_LIBS) \
                   $(snappy_LIBS) \
+                  $(lz4_LIBS) \
                   $(liblzma_LIBS) \
                   $(openssl_LIBS) \
                   $(curl_LIBS) \
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -96,6 +96,9 @@
 #ifdef HAVE_LIBSNAPPY
 #include <snappy-c.h> // C forever
 #endif
+#ifdef HAVE_LIBLZ4
+#include <lz4.h>
+#endif
 
 #ifdef HAVE_ICONV
 #ifdef HAVE_ICONV_H
@@ -3985,11 +3988,98 @@ typedef struct bs2 {
                                 * size-short */
 } bs2;
 
+
+static ssize_t 
+compress_stream_data(bs2 *s) {
+       assert(s->comp != COMPRESSION_NONE);
+       if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+               size_t compressed_length = s->compbufsiz;
+               snappy_status ret;
+               if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, 
&compressed_length)) != SNAPPY_OK) {
+                       s->s->errnr = (int) ret;
+                       return -1;
+               }
+               return compressed_length;
+#else
+               assert(0);
+               return -1;
+#endif
+       } else if (s->comp == COMPRESSION_LZ4) {
+#ifdef HAVE_LIBLZ4
+               int compressed_length = (int) s->compbufsiz;
+               if ((compressed_length = LZ4_compress_fast(s->buf, s->compbuf, 
s->nr, compressed_length, 1)) == 0) {
+                       s->s->errnr = -1;
+                       return -1;
+               }
+               return compressed_length;
+#else
+               assert(0);
+               return -1;
+#endif
+       }
+       return -1;
+}
+
+
+static ssize_t 
+decompress_stream_data(bs2 *s) {
+       assert(s->comp != COMPRESSION_NONE);
+       if (s->comp == COMPRESSION_SNAPPY) {
+#ifdef HAVE_LIBSNAPPY
+               snappy_status ret;
+               size_t uncompressed_length = s->bufsiz;
+               if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, 
&uncompressed_length)) != SNAPPY_OK) {
+                       s->s->errnr = (int) ret;
+                       return -1;
+               }
+               return (ssize_t) uncompressed_length;
+#else
+               assert(0);
+               return -1;
+#endif
+       } else if (s->comp == COMPRESSION_LZ4) {
+               int uncompressed_length = (int) s->bufsiz;
+#ifdef HAVE_LIBLZ4
+               if ((uncompressed_length = LZ4_decompress_safe(s->compbuf, 
s->buf, s->itotal, uncompressed_length)) <= 0) {
+                       s->s->errnr = uncompressed_length;
+                       return -1;
+               }
+               return uncompressed_length;
+#else
+               assert(0);
+               return -1;
+#endif
+       }
+       return -1;
+}
+
+static ssize_t
+compression_size_bound(bs2 *s) {
+       if (s->comp == COMPRESSION_NONE) {
+               return 0;
+       } else if (s->comp == COMPRESSION_SNAPPY) {
+#ifndef HAVE_LIBSNAPPY
+               return -1;
+#else
+               return snappy_max_compressed_length(s->bufsiz);
+#endif
+       } else if (s->comp == COMPRESSION_LZ4) {
+#ifndef HAVE_LIBLZ4
+               return -1;
+#else
+               return LZ4_compressBound(s->bufsiz);
+#endif
+       }
+       return -1;
+}
+
 static bs2 *
 bs2_create(stream *s, size_t bufsiz, compression_method comp)
 {
        /* should be a binary stream */
        bs2 *ns;
+       ssize_t compress_bound = 0;
 
        if ((ns = malloc(sizeof(*ns) + bufsiz)) == NULL)
                return NULL;
@@ -3999,18 +4089,18 @@ bs2_create(stream *s, size_t bufsiz, com
        ns->bufsiz = bufsiz;
        ns->comp = comp;
        ns->compbuf = NULL;
-       if (comp == COMPRESSION_SNAPPY) {
-#ifndef HAVE_LIBSNAPPY
-               free(ns);
-               return NULL;
-#else
-               ns->compbufsiz = snappy_max_compressed_length(ns->bufsiz);
+
+       compress_bound = compression_size_bound(ns);
+       if (compress_bound > 0) {
+               ns->compbufsiz = compress_bound;
                ns->compbuf = malloc(ns->compbufsiz);
                if (!ns->compbuf) {
                        free(ns);
                        return NULL;
                }
-#endif
+       } else if (compress_bound < 0) {
+               free(ns);
+               return NULL;
        }
        return ns;
 }
@@ -4069,23 +4159,17 @@ bs2_write(stream *ss, const void *buf, s
                        blksize = s->nr;
                        writebuf = s->buf;
 
-                       if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
-                               snappy_status ret;
-                               size_t compressed_length = s->compbufsiz;
-                               if ((ret = snappy_compress(s->buf, s->nr, 
s->compbuf, &compressed_length)) != SNAPPY_OK) {
-                                       ss->errnr = (int) ret;
+                       if (s->comp != COMPRESSION_NONE) {
+                               ssize_t compressed_length = 
compress_stream_data(s);
+                               if (compressed_length < 0) {
                                        return -1;
                                }
                                writebuf = s->compbuf;
                                blksize = (lng) compressed_length;
                                writelen = compressed_length;
-#else
-                               assert(0);
-                               return -1;
-#endif
                        }
 
+
                        /* the last bit tells whether a flush is in there, it's 
not
                         * at this moment, so shift it to the left */
                        blksize <<= 1;
@@ -4140,21 +4224,14 @@ bs2_flush(stream *ss)
                blksize  = s->nr;
                writebuf = s->buf;
 
-               if (s->nr > 0 && s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
-                       size_t compressed_length = s->compbufsiz;
-                       snappy_status ret;
-                       if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, 
&compressed_length)) != SNAPPY_OK) {
-                               ss->errnr = (int) ret;
+               if (s->nr > 0 && s->comp != COMPRESSION_NONE) {
+                       ssize_t compressed_length = compress_stream_data(s);
+                       if (compressed_length < 0) {
                                return -1;
                        }
                        writebuf = s->compbuf;
                        blksize = (lng) compressed_length;
                        writelen = compressed_length;
-#else
-                       assert(0);
-                       return -1;
-#endif
                }
 
                /* indicate that this is the last buffer of a block by
@@ -4235,13 +4312,12 @@ bs2_read(stream *ss, void *buf, size_t e
 
                if (s->itotal > 0) {
                        // read everything into the comp buf
-                       size_t uncompressed_length = s->bufsiz;
+                       ssize_t uncompressed_length = s->bufsiz;
                        size_t m = 0;
                        char *buf = s->buf;
                        if (s->comp != COMPRESSION_NONE) {
                                buf = s->compbuf;
                        }
-                       snappy_status ret;
 
                        while (m < s->itotal) {
                                ssize_t bytes_read = 0;
@@ -4252,16 +4328,12 @@ bs2_read(stream *ss, void *buf, size_t e
                                }
                                m += bytes_read;
                        }
-                       if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
-                               if ((ret = snappy_uncompress(s->compbuf, 
s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
-                                       ss->errnr = (int) ret;
-                                       return -1;
-                               }
-#else
-               assert(0);
-               return -1;
-#endif
+                       if (s->comp != COMPRESSION_NONE) {
+                                uncompressed_length = 
decompress_stream_data(s);
+                                if (uncompressed_length < 0) {
+                                       ss->errnr = (int) uncompressed_length;
+                                       return -1;
+                                }
                        } else {
                                uncompressed_length = m;
                        }
@@ -4317,13 +4389,12 @@ bs2_read(stream *ss, void *buf, size_t e
 
                        if (s->itotal > 0) {
                                // read everything into the comp buf
-                               size_t uncompressed_length = s->bufsiz;
+                               ssize_t uncompressed_length = s->bufsiz;
                                size_t m = 0;
                                char *buf = s->buf;
                                if (s->comp != COMPRESSION_NONE) {
                                        buf = s->compbuf;
                                }
-                               snappy_status ret;
 
                                while (m < s->itotal) {
                                        ssize_t bytes_read = 0;
@@ -4334,16 +4405,12 @@ bs2_read(stream *ss, void *buf, size_t e
                                        }
                                        m += bytes_read;
                                }
-                               if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_LIBSNAPPY
-                                       if ((ret = 
snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != 
SNAPPY_OK) {
-                                               ss->errnr = (int) ret;
-                                               return -1;
-                                       }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to