Changeset: bb3e7d1e72e1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/bb3e7d1e72e1
Added Files:
        sql/test/copy/Tests/compressed-on-client-bz2.SQL.py
        sql/test/copy/Tests/compressed-on-client-gz.SQL.py
        sql/test/copy/Tests/compressed-on-client-lz4.SQL.py
        sql/test/copy/Tests/compressed-on-client-xz.SQL.py
        sql/test/copy/Tests/compressed_on_client.py
Modified Files:
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql.h
        sql/backends/monet5/sql_bincopy.c
        sql/server/sql_parser.y
        sql/server/sql_symbol.c
        sql/server/sql_symbol.h
        sql/test/copy/Tests/All
Branch: copyfaster
Log Message:

Support compressed ON CLIENT

Syntax: ON '{algo}' CLIENT with algo=lz4/gz/xz/bz2

This makes the server decompress incoming and compress outgoing ON
CLIENT streams.

The client is not aware of any difference, in particular you must
make sure that the client doesn't already decompress the data
or compresses it a second time!


diffs (truncated from 438 to 300 lines):

diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -2520,6 +2520,43 @@ mvc_result_set_wrap( Client cntxt, MalBl
        return msg;
 }
 
+
+str
+wrap_onclient_compression(stream **inner, str context, int nr)
+{
+       if (nr <= 1)
+       return MAL_SUCCEED;
+
+       // these number match those in sql_parser.y's opt_on_location.
+       stream *s = *inner;
+       stream *cs;
+       switch (nr) {
+               case 11:
+                       cs = gz_stream(s, 0);
+                       break;
+               case 12:
+                       cs = bz2_stream(s, 0);
+                       break;
+               case 13:
+                       cs = xz_stream(s, 0);
+                       break;
+               case 14:
+                       cs = lz4_stream(s, 0);
+                       break;
+               default:
+                       throw(IO, context, SQLSTATE(42000) "compression algo id 
not found");
+       }
+       if (cs == NULL || mnstr_errnr(cs) != MNSTR_NO__ERROR) {
+               str msg = createException(IO, context, SQLSTATE(42000) "%s", 
mnstr_peek_error(NULL));
+               close_stream(cs);
+               return msg;
+       }
+       *inner = cs;
+       return MAL_SUCCEED;
+}
+
+
+
 /* Copy the result set into a CSV file */
 str
 mvc_export_table_wrap( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
@@ -2627,6 +2664,11 @@ mvc_export_table_wrap( Client cntxt, Mal
                        close_stream(s);
                        goto wrapup_result_set1;
                }
+               msg = wrap_onclient_compression(&s, "sql.copy_from", onclient);
+               if (msg != NULL) {
+                       close_stream(s);
+                       return msg;
+               }
                be->output_format = OFMT_CSV;
        }
        if ((ok = mvc_export_result(cntxt->sqlcontext, s, res, tostdout, 
cntxt->qryctx.starttime, mb->optimize)) < 0) {
@@ -3158,6 +3200,11 @@ mvc_import_table_wrap(Client cntxt, MalB
                        close_stream(ss);
                        return msg;
                }
+               msg = wrap_onclient_compression(&ss, "sql.copy_from", onclient);
+               if (msg != NULL) {
+                       close_stream(ss);
+                       return msg;
+               }
 
                if (!strNil(fixed_widths)) {
                        size_t ncol = 0, current_width_entry = 0, i;
diff --git a/sql/backends/monet5/sql.h b/sql/backends/monet5/sql.h
--- a/sql/backends/monet5/sql.h
+++ b/sql/backends/monet5/sql.h
@@ -93,6 +93,7 @@ extern str mvc_export_chunk_wrap(Client 
 extern str mvc_export_operation_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
 extern str mvc_scalar_value_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 extern str mvc_row_result_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+extern str wrap_onclient_compression(stream **s, str context, int nr);
 extern str mvc_export_row_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 extern str mvc_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 extern str mvc_bin_import_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
diff --git a/sql/backends/monet5/sql_bincopy.c 
b/sql/backends/monet5/sql_bincopy.c
--- a/sql/backends/monet5/sql_bincopy.c
+++ b/sql/backends/monet5/sql_bincopy.c
@@ -272,9 +272,12 @@ import_column(backend *be, bat *ret, BUN
        } else {
                s = open_rstream(path);
        }
-       if (!s) {
+       if (s == NULL || mnstr_errnr(s) != MNSTR_NO__ERROR) {
                bailout("%s", mnstr_peek_error(NULL));
        }
+       msg = wrap_onclient_compression(&s, "sql.copy_from", onclient);
+       if (msg != NULL)
+               goto end;
 
        // Do the work
        msg = load_column(rec, path, bat, s, width, byteswap, nrows, 
&eof_reached);
@@ -431,7 +434,7 @@ dump_binary_column(const struct type_rec
 
 
 static str
-export_column(backend *be, BAT *b, bool byteswap, str filename, bool onclient)
+export_column(backend *be, BAT *b, bool byteswap, str filename, int onclient)
 {
        static const char mal_operator[] = "sql.export_bin_column";
        str msg = MAL_SUCCEED;
@@ -445,14 +448,15 @@ export_column(backend *be, BAT *b, bool 
                bailout("COPY INTO BINARY not implemented for '%s'", gdk_name);
 
        if (onclient) {
-               (void)be;
                s = mapi_request_download(filename, true, be->mvc->scanner.rs, 
be->mvc->scanner.ws);
        } else {
                s = open_wstream(filename);
        }
-       if (!s) {
-               bailout("%s", mnstr_peek_error(NULL));
+       if (s == NULL || mnstr_errnr(s) != MNSTR_NO__ERROR) {
        }
+       msg = wrap_onclient_compression(&s, "sql.copy_from", onclient);
+       if (msg != NULL)
+               goto end;
 
        msg = dump_binary_column(rec, b, 0, BATcount(b), byteswap, s);
 
@@ -481,7 +485,7 @@ mvc_bin_export_column_wrap(Client cntxt,
        // arg 1 handled below
        bool byteswap = *getArgReference_bit(stk, pci, 2);
        str filename = *getArgReference_str(stk, pci, 3);
-       bool onclient = (bool) *getArgReference_int(stk, pci, 4);
+       int onclient = *getArgReference_int(stk, pci, 4);
 
        // Usually we are called with a BAT argument but if the user types
        // something like
diff --git a/sql/server/sql_parser.y b/sql/server/sql_parser.y
--- a/sql/server/sql_parser.y
+++ b/sql/server/sql_parser.y
@@ -3189,10 +3189,24 @@ opt_to_savepoint:
        ;
 
 opt_on_location:
-               /* empty */ { $$ = 0; }
-       |       ON CLIENT   { $$ = 1; }
-       |       ON SERVER   { $$ = 0; }
-       ;
+               /* empty */     { $$ = 0; }
+       | ON SERVER             { $$ = 0; }
+       | ON CLIENT             { $$ = 1; }
+       | ON string CLIENT {
+               if (strcmp($2, "gz") == 0)
+                       $$ = 11;
+               else if (strcmp($2, "bz2") == 0)
+                       $$ = 12;
+               else if (strcmp($2, "xz") == 0)
+                       $$ = 13;
+               else if (strcmp($2, "lz4") == 0)
+                       $$ = 14;
+               else {
+                       yyerror(m, "unknown ON CLIENT compression algorithm");
+                       YYABORT;
+               }
+  }
+  ;
 
 import_stmt:
                copyfrom_stmt    { $$ = $1; }
@@ -3210,7 +3224,7 @@ copyfrom_stmt:
                                                                                
                 $7, /* sources */
                                                                                
                 $8, /* header_list */
                                                                                
                 $2  /* nr_offset */);
-                       copy->on_client = !!$9;
+                       copy->on_client = $9;
                        $$ = (symbol*)copy;
                }
     /*  1    2      3    4     5               6    7     8 */
diff --git a/sql/server/sql_symbol.c b/sql/server/sql_symbol.c
--- a/sql/server/sql_symbol.c
+++ b/sql/server/sql_symbol.c
@@ -302,7 +302,7 @@ newCopyFromNode(allocator *sa, struct dl
                        .null_string = NULL,
                        .best_effort = false,
                        .fwf_widths = NULL,
-                       .on_client = false,
+                       .on_client = 0,
                        .escape = true,
                        .decsep = ".",
                        .decskip = NULL,
diff --git a/sql/server/sql_symbol.h b/sql/server/sql_symbol.h
--- a/sql/server/sql_symbol.h
+++ b/sql/server/sql_symbol.h
@@ -105,7 +105,7 @@ typedef struct CopyFromNode {
        char *null_string; /* default NULL, not "NULL"! */
        bool best_effort; /* default false */
        struct dlist *fwf_widths; /* must be NULL for FROM STDIN */
-       bool on_client; /* must be false for FROM STDIN */
+       int on_client; /* 1 is yes, >1 is compressed, always 0 for FROM STDIN */
        bool escape; /* default true */
        char *decsep; /* default "." */
        char *decskip; /* default NULL */
diff --git a/sql/test/copy/Tests/All b/sql/test/copy/Tests/All
--- a/sql/test/copy/Tests/All
+++ b/sql/test/copy/Tests/All
@@ -19,3 +19,7 @@ no_escape2
 crlf_normalization
 select-from-file
 flexible-order
+HAVE_LIBZ?compressed-on-client-gz
+HAVE_LIBBZ2?compressed-on-client-bz2
+HAVE_LIBLZMA?compressed-on-client-xz
+HAVE_LIBLZ4&HAVE_PYTHON_LZ4?compressed-on-client-lz4
diff --git a/sql/test/copy/Tests/compressed-on-client-bz2.SQL.py 
b/sql/test/copy/Tests/compressed-on-client-bz2.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/test/copy/Tests/compressed-on-client-bz2.SQL.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python3
+
+from io import BytesIO
+from compressed_on_client import test_compressed_onclient
+
+from bz2 import BZ2File
+
+def compress(data):
+    bio = BytesIO()
+    gz = BZ2File(bio, 'wb')
+    gz.write(data)
+    gz.close()
+    return bio.getbuffer().tobytes()
+
+def decompress(data):
+    bio = BytesIO(data)
+    return BZ2File(bio, 'rb').read(1_000_000)
+
+test_compressed_onclient('bz2', compress, decompress)
diff --git a/sql/test/copy/Tests/compressed-on-client-gz.SQL.py 
b/sql/test/copy/Tests/compressed-on-client-gz.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/test/copy/Tests/compressed-on-client-gz.SQL.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python3
+
+from io import BytesIO
+from compressed_on_client import test_compressed_onclient
+
+from gzip import GzipFile
+
+def compress(data):
+    bio = BytesIO()
+    gz = GzipFile(mode='wb', fileobj=bio)
+    gz.write(data)
+    gz.close()
+    return bio.getbuffer().tobytes()
+
+def decompress(data):
+    bio = BytesIO(data)
+    return GzipFile(mode='rb', fileobj=bio).read(1_000_000)
+
+test_compressed_onclient('gz', compress, decompress)
diff --git a/sql/test/copy/Tests/compressed-on-client-lz4.SQL.py 
b/sql/test/copy/Tests/compressed-on-client-lz4.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/test/copy/Tests/compressed-on-client-lz4.SQL.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python3
+
+from io import BytesIO
+from compressed_on_client import test_compressed_onclient
+
+from lz4.frame import LZ4FrameFile
+
+def compress(data):
+    bio = BytesIO()
+    gz = LZ4FrameFile(bio, 'wb')
+    gz.write(data)
+    gz.close()
+    return bio.getbuffer().tobytes()
+
+def decompress(data):
+    bio = BytesIO(data)
+    return LZ4FrameFile(bio, 'rb').read(1_000_000)
+
+test_compressed_onclient('lz4', compress, decompress)
diff --git a/sql/test/copy/Tests/compressed-on-client-xz.SQL.py 
b/sql/test/copy/Tests/compressed-on-client-xz.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/test/copy/Tests/compressed-on-client-xz.SQL.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python3
+
+from io import BytesIO
+from compressed_on_client import test_compressed_onclient
+
+from lzma import LZMAFile
+
+def compress(data):
+    bio = BytesIO()
+    gz = LZMAFile(bio, 'wb')
+    gz.write(data)
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to