Repository: incubator-hawq
Updated Branches:
  refs/heads/master c589334d6 -> 355c43704


HAWQ-774. Add snappy compression support to row oriented storage


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/355c4370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/355c4370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/355c4370

Branch: refs/heads/master
Commit: 355c43704cf8fd74ed46da86f7e45ac9a0d235c0
Parents: c589334
Author: Paul Guo <[email protected]>
Authored: Mon Jun 6 10:16:15 2016 +0800
Committer: Paul Guo <[email protected]>
Committed: Mon Jun 6 18:18:06 2016 +0800

----------------------------------------------------------------------
 src/backend/access/common/reloptions.c |   3 +-
 src/backend/catalog/pg_compression.c   | 124 +++++++++++++++++++++++++++-
 src/include/catalog/pg_appendonly.h    |   2 +-
 src/include/catalog/pg_compression.h   |   2 +
 src/include/catalog/pg_proc.h          |  20 +++++
 src/include/catalog/pg_proc.sql        |  10 +++
 src/include/utils/builtins.h           |   6 ++
 7 files changed, 161 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/backend/access/common/reloptions.c
----------------------------------------------------------------------
diff --git a/src/backend/access/common/reloptions.c 
b/src/backend/access/common/reloptions.c
index 38b404a..8822d01 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -543,8 +543,7 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
                                                 errOmitLocation(true)));
                }
 
-               if (!(columnstore == RELSTORAGE_PARQUET) && 
((strcmp(compresstype, "snappy") == 0)
-                               || (strcmp(compresstype, "gzip") == 0)))
+               if (!(columnstore == RELSTORAGE_PARQUET) && 
(strcmp(compresstype, "gzip") == 0))
                {
                        ereport(ERROR,
                                                
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/backend/catalog/pg_compression.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/pg_compression.c 
b/src/backend/catalog/pg_compression.c
index 9d0ff4a..8eb51d5 100644
--- a/src/backend/catalog/pg_compression.c
+++ b/src/backend/catalog/pg_compression.c
@@ -46,6 +46,8 @@
 #include "utils/relcache.h"
 #include "utils/syscache.h"
 
+#include "snappy-c.h"
+
 /* names we expect to see in ENCODING clauses */
 char *storage_directive_names[] = {"compresstype", "compresslevel",
                                                                   "blocksize", 
NULL};
@@ -124,7 +126,7 @@ GetCompressionImplementation(char *comptype)
                                                comptype)));
 
        funcs = palloc0(sizeof(PGFunction) * NUM_COMPRESS_FUNCS);
-       
+
        ctup = (Form_pg_compression)GETSTRUCT(tuple);
 
        Insist(OidIsValid(ctup->compconstructor));
@@ -350,6 +352,121 @@ zlib_validator(PG_FUNCTION_ARGS)
 }
 
 Datum
+snappy_constructor(PG_FUNCTION_ARGS)
+{
+       TupleDesc                       td = PG_GETARG_POINTER(0);
+       StorageAttributes       *sa = PG_GETARG_POINTER(1);
+       CompressionState        *cs     = palloc0(sizeof(CompressionState));
+
+       cs->opaque = NULL;
+       cs->desired_sz = snappy_max_compressed_length;
+
+       Insist(PointerIsValid(td));
+       Insist(PointerIsValid(sa->comptype));
+
+       PG_RETURN_POINTER(cs);
+}
+
+Datum
+snappy_destructor(PG_FUNCTION_ARGS)
+{
+       CompressionState        *cs = PG_GETARG_POINTER(0);
+
+       if (cs->opaque)
+       {
+               Insist(PointerIsValid(cs->opaque));
+               pfree(cs->opaque);
+       }
+
+       PG_RETURN_VOID();
+}
+
+static void
+elog_snappy_error(snappy_status retval, char *func_name,
+                                 int src_sz, int dst_sz, int dst_used)
+{
+       switch (retval)
+       {
+               case SNAPPY_INVALID_INPUT:
+                       elog(ERROR, "invalid input for %s(): "
+                                "src_sz=%d dst_sz=%d dst_used=%d",
+                                func_name, src_sz, dst_sz, dst_used);
+                       break;
+               case SNAPPY_BUFFER_TOO_SMALL:
+                       elog(ERROR, "buffer is too small in %s(): "
+                                "src_sz=%d dst_sz=%d dst_used=%d",
+                                func_name, src_sz, dst_sz, dst_used);
+                       break;
+               default:
+                       elog(ERROR, "unknown failure (return value %d) for 
%s(): "
+                                "src_sz=%d dst_sz=%d dst_used=%d", retval, 
func_name,
+                                src_sz, dst_sz, dst_used);
+                       break;
+       }
+}
+
+Datum
+snappy_compress_internal(PG_FUNCTION_ARGS)
+{
+       const char              *src = PG_GETARG_POINTER(0);
+       size_t                  src_sz = PG_GETARG_INT32(1);
+       char                    *dst = PG_GETARG_POINTER(2);
+       size_t                  dst_sz = PG_GETARG_INT32(3);
+       size_t                  *dst_used = PG_GETARG_POINTER(4);
+       size_t                  compressed_length;
+       snappy_status   retval;
+
+       compressed_length = snappy_max_compressed_length(src_sz);
+       Insist(dst_sz >= compressed_length);
+
+       retval = snappy_compress(src, src_sz, dst, &compressed_length);
+       *dst_used = compressed_length;
+
+       if (retval != SNAPPY_OK)
+               elog_snappy_error(retval, "snappy_compress", src_sz, dst_sz, 
*dst_used);
+
+       PG_RETURN_VOID();
+}
+
+Datum
+snappy_decompress_internal(PG_FUNCTION_ARGS)
+{
+       const char              *src    = PG_GETARG_POINTER(0);
+       size_t                  src_sz = PG_GETARG_INT32(1);
+       char                    *dst    = PG_GETARG_POINTER(2);
+       int32                   dst_sz = PG_GETARG_INT32(3);
+       int32                   *dst_used = PG_GETARG_POINTER(4);
+       size_t                  uncompressed_length;
+       snappy_status   retval;
+
+       Insist(src_sz > 0 && dst_sz > 0);
+
+       retval = snappy_uncompressed_length((char *) src, (size_t) src_sz,
+                                                                               
&uncompressed_length);
+       if (retval != SNAPPY_OK)
+               elog_snappy_error(retval, "snappy_uncompressed_length",
+                                                 src_sz, dst_sz, *dst_used);
+
+       Insist(dst_sz >= uncompressed_length);
+
+       retval = snappy_uncompress((char *) src, src_sz, (char *) dst,
+                                                          
&uncompressed_length);
+       *dst_used = uncompressed_length;
+
+       if (retval != SNAPPY_OK)
+               elog_snappy_error(retval, "snappy_uncompressed",
+                                                 src_sz, dst_sz, *dst_used);
+
+       PG_RETURN_VOID();
+}
+
+Datum
+snappy_validator(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_VOID();
+}
+
+Datum
 rle_type_constructor(PG_FUNCTION_ARGS)
 {
        elog(ERROR, "rle_type block compression not supported");
@@ -437,10 +554,11 @@ compresstype_is_valid(char *comptype)
                                         cql("SELECT COUNT(*) FROM 
pg_compression "
                                                 " WHERE compname = :1 ",
                                                 NameGetDatum(&compname))));
-               
+
+       /* FIXME: This is a hack. Should register gzip handlers into 
pg_compression table. */
        if(!found)
        {
-               if((strcmp(comptype, "snappy") == 0) || strcmp(comptype, 
"gzip") == 0)
+               if(strcmp(comptype, "gzip") == 0)
                        found = true;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_appendonly.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_appendonly.h 
b/src/include/catalog/pg_appendonly.h
index c78789b..5f0ef04 100644
--- a/src/include/catalog/pg_appendonly.h
+++ b/src/include/catalog/pg_appendonly.h
@@ -75,7 +75,7 @@ CATALOG(pg_appendonly,6105) BKI_WITHOUT_OIDS
        int2                    majorversion;           /* major version 
indicating what's stored in this table  */
        int2                    minorversion;           /* minor version 
indicating what's stored in this table  */
        bool                    checksum;                       /* true if 
checksum is stored with data and checked */
-       text                    compresstype;           /* the compressor used 
(zlib, or quicklz) */
+       text                    compresstype;           /* the compressor used 
(zlib, or or snappy, quicklz) */
     bool            columnstore;        /* true if co or parquet table, false 
if ao table*/
     Oid             segrelid;           /* OID of aoseg table; 0 if none */
     Oid             segidxid;           /* if aoseg table, OID of segno index 
*/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_compression.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_compression.h 
b/src/include/catalog/pg_compression.h
index 80ef711..8746db9 100644
--- a/src/include/catalog/pg_compression.h
+++ b/src/include/catalog/pg_compression.h
@@ -117,6 +117,8 @@ typedef FormData_pg_compression *Form_pg_compression;
 /* TIDYCAT_END_CODEGEN */
 
 /* Initial contents */
+DATA(insert OID = 3069 ( snappy gp_snappy_constructor gp_snappy_destructor 
gp_snappy_compress gp_snappy_decompress gp_snappy_validator PGUID ));
+
 DATA(insert OID = 3060 ( zlib gp_zlib_constructor gp_zlib_destructor 
gp_zlib_compress gp_zlib_decompress gp_zlib_validator PGUID ));
 
 DATA(insert OID = 3061 ( quicklz gp_quicklz_constructor gp_quicklz_destructor 
gp_quicklz_compress gp_quicklz_decompress gp_quicklz_validator PGUID ));

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_proc.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e818909..4441126 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -10312,6 +10312,26 @@ DESCR("anytable type serialization input function");
 DATA(insert OID = 3055 ( anytable_out  PGNSP PGUID 12 f f t f i 1 2275 f 
"3053" _null_ _null_ _null_ anytable_out - _null_ n ));
 DESCR("anytable type serialization output function");
 
+/* gp_snappy_constructor(internal, internal, bool) => internal */
+DATA(insert OID = 5080 ( gp_snappy_constructor  PGNSP PGUID 12 f f f f v 3 
2281 f "2281 2281 16" _null_ _null_ _null_ snappy_constructor - _null_ n ));
+DESCR("snappy constructor");
+
+/* gp_snappy_destructor(internal) => void */
+DATA(insert OID = 5081 ( gp_snappy_destructor  PGNSP PGUID 12 f f f f v 1 2278 
f "2281" _null_ _null_ _null_ snappy_destructor - _null_ n ));
+DESCR("snappy destructor");
+
+/* gp_snappy_compress(internal, int4, internal, int4, internal, internal) => 
void */
+DATA(insert OID = 5082 ( gp_snappy_compress  PGNSP PGUID 12 f f f f i 6 2278 f 
"2281 23 2281 23 2281 2281" _null_ _null_ _null_ snappy_compress_internal - 
_null_ n ));
+DESCR("snappy compressor");
+
+/* gp_snappy_decompress(internal, int4, internal, int4, internal, internal) => 
void */
+DATA(insert OID = 5083 ( gp_snappy_decompress  PGNSP PGUID 12 f f f f i 6 2278 
f "2281 23 2281 23 2281 2281" _null_ _null_ _null_ snappy_decompress_internal - 
_null_ n ));
+DESCR("snappy decompressor");
+
+/* gp_snappy_validator(internal) => void */
+DATA(insert OID = 9926 ( gp_snappy_validator  PGNSP PGUID 12 f f f f i 1 2278 
f "2281" _null_ _null_ _null_ snappy_validator - _null_ n ));
+DESCR("snappy compression validator");
+
 /* gp_quicklz_constructor(internal, internal, bool) => internal */
 DATA(insert OID = 5076 ( gp_quicklz_constructor  PGNSP PGUID 12 f f f f v 3 
2281 f "2281 2281 16" _null_ _null_ _null_ quicklz_constructor - _null_ n ));
 DESCR("quicklz constructor");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_proc.sql
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql
index 987b802..50985d4 100644
--- a/src/include/catalog/pg_proc.sql
+++ b/src/include/catalog/pg_proc.sql
@@ -5450,6 +5450,16 @@
 
  CREATE FUNCTION anytable_out(anytable) RETURNS cstring LANGUAGE internal 
IMMUTABLE STRICT AS 'anytable_out' WITH (OID=3055, DESCRIPTION="anytable type 
serialization output function");
 
+ CREATE FUNCTION gp_snappy_constructor(internal, internal, bool) RETURNS 
internal LANGUAGE internal VOLATILE AS 'snappy_constructor' WITH (OID=5080, 
DESCRIPTION="snappy constructor");
+
+ CREATE FUNCTION gp_snappy_destructor(internal) RETURNS void LANGUAGE internal 
VOLATILE AS 'snappy_destructor' WITH(OID=5081, DESCRIPTION="snappy destructor");
+
+ CREATE FUNCTION gp_snappy_compress(internal, int4, internal, int4, internal, 
internal) RETURNS void LANGUAGE internal IMMUTABLE AS 
'snappy_compress_internal' WITH(OID=5082, DESCRIPTION="snappy compressor");
+
+ CREATE FUNCTION gp_snappy_decompress(internal, int4, internal, int4, 
internal, internal) RETURNS void LANGUAGE internal IMMUTABLE AS 
'snappy_decompress_internal' WITH(OID=5083, DESCRIPTION="snappy decompressor");
+
+ CREATE FUNCTION gp_snappy_validator(internal) RETURNS void LANGUAGE internal 
IMMUTABLE AS 'snappy_validator' WITH(OID=9926, DESCRIPTION="snappy compression 
validator");
+
  CREATE FUNCTION gp_quicklz_constructor(internal, internal, bool) RETURNS 
internal LANGUAGE internal VOLATILE AS 'quicklz_constructor' WITH (OID=5076, 
DESCRIPTION="quicklz constructor");
 
  CREATE FUNCTION gp_quicklz_destructor(internal) RETURNS void LANGUAGE 
internal VOLATILE AS 'quicklz_destructor' WITH(OID=5077, DESCRIPTION="quicklz 
destructor");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/utils/builtins.h
----------------------------------------------------------------------
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index af381c4..4dca6de 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1164,6 +1164,12 @@ extern Datum quicklz_compress(PG_FUNCTION_ARGS);
 extern Datum quicklz_decompress(PG_FUNCTION_ARGS);
 extern Datum quicklz_validator(PG_FUNCTION_ARGS);
 
+extern Datum snappy_constructor(PG_FUNCTION_ARGS);
+extern Datum snappy_destructor(PG_FUNCTION_ARGS);
+extern Datum snappy_compress_internal(PG_FUNCTION_ARGS);
+extern Datum snappy_decompress_internal(PG_FUNCTION_ARGS);
+extern Datum snappy_validator(PG_FUNCTION_ARGS);
+
 extern Datum zlib_constructor(PG_FUNCTION_ARGS);
 extern Datum zlib_destructor(PG_FUNCTION_ARGS);
 extern Datum zlib_compress(PG_FUNCTION_ARGS);

Reply via email to