Repository: incubator-hawq Updated Branches: refs/heads/master c589334d6 -> 355c43704
HAWQ-774. Add snappy compression support to row oriented storage Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/355c4370 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/355c4370 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/355c4370 Branch: refs/heads/master Commit: 355c43704cf8fd74ed46da86f7e45ac9a0d235c0 Parents: c589334 Author: Paul Guo <[email protected]> Authored: Mon Jun 6 10:16:15 2016 +0800 Committer: Paul Guo <[email protected]> Committed: Mon Jun 6 18:18:06 2016 +0800 ---------------------------------------------------------------------- src/backend/access/common/reloptions.c | 3 +- src/backend/catalog/pg_compression.c | 124 +++++++++++++++++++++++++++- src/include/catalog/pg_appendonly.h | 2 +- src/include/catalog/pg_compression.h | 2 + src/include/catalog/pg_proc.h | 20 +++++ src/include/catalog/pg_proc.sql | 10 +++ src/include/utils/builtins.h | 6 ++ 7 files changed, 161 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/backend/access/common/reloptions.c ---------------------------------------------------------------------- diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 38b404a..8822d01 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -543,8 +543,7 @@ default_reloptions(Datum reloptions, bool validate, char relkind, errOmitLocation(true))); } - if (!(columnstore == RELSTORAGE_PARQUET) && ((strcmp(compresstype, "snappy") == 0) - || (strcmp(compresstype, "gzip") == 0))) + if (!(columnstore == RELSTORAGE_PARQUET) && (strcmp(compresstype, "gzip") == 0)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/backend/catalog/pg_compression.c ---------------------------------------------------------------------- diff --git a/src/backend/catalog/pg_compression.c b/src/backend/catalog/pg_compression.c index 9d0ff4a..8eb51d5 100644 --- a/src/backend/catalog/pg_compression.c +++ b/src/backend/catalog/pg_compression.c @@ -46,6 +46,8 @@ #include "utils/relcache.h" #include "utils/syscache.h" +#include "snappy-c.h" + /* names we expect to see in ENCODING clauses */ char *storage_directive_names[] = {"compresstype", "compresslevel", "blocksize", NULL}; @@ -124,7 +126,7 @@ GetCompressionImplementation(char *comptype) comptype))); funcs = palloc0(sizeof(PGFunction) * NUM_COMPRESS_FUNCS); - + ctup = (Form_pg_compression)GETSTRUCT(tuple); Insist(OidIsValid(ctup->compconstructor)); @@ -350,6 +352,121 @@ zlib_validator(PG_FUNCTION_ARGS) } Datum +snappy_constructor(PG_FUNCTION_ARGS) +{ + TupleDesc td = PG_GETARG_POINTER(0); + StorageAttributes *sa = PG_GETARG_POINTER(1); + CompressionState *cs = palloc0(sizeof(CompressionState)); + + cs->opaque = NULL; + cs->desired_sz = snappy_max_compressed_length; + + Insist(PointerIsValid(td)); + Insist(PointerIsValid(sa->comptype)); + + PG_RETURN_POINTER(cs); +} + +Datum +snappy_destructor(PG_FUNCTION_ARGS) +{ + CompressionState *cs = PG_GETARG_POINTER(0); + + if (cs->opaque) + { + Insist(PointerIsValid(cs->opaque)); + pfree(cs->opaque); + } + + PG_RETURN_VOID(); +} + +static void +elog_snappy_error(snappy_status retval, char *func_name, + int src_sz, int dst_sz, int dst_used) +{ + switch (retval) + { + case SNAPPY_INVALID_INPUT: + elog(ERROR, "invalid input for %s(): " + "src_sz=%d dst_sz=%d dst_used=%d", + func_name, src_sz, dst_sz, dst_used); + break; + case SNAPPY_BUFFER_TOO_SMALL: + elog(ERROR, "buffer is too small in %s(): " + "src_sz=%d dst_sz=%d dst_used=%d", + func_name, src_sz, dst_sz, dst_used); + break; + default: + elog(ERROR, "unknown failure (return value %d) for %s(): " + "src_sz=%d dst_sz=%d dst_used=%d", retval, func_name, + src_sz, dst_sz, dst_used); + break; + } +} + +Datum +snappy_compress_internal(PG_FUNCTION_ARGS) +{ + const char *src = PG_GETARG_POINTER(0); + size_t src_sz = PG_GETARG_INT32(1); + char *dst = PG_GETARG_POINTER(2); + size_t dst_sz = PG_GETARG_INT32(3); + size_t *dst_used = PG_GETARG_POINTER(4); + size_t compressed_length; + snappy_status retval; + + compressed_length = snappy_max_compressed_length(src_sz); + Insist(dst_sz >= compressed_length); + + retval = snappy_compress(src, src_sz, dst, &compressed_length); + *dst_used = compressed_length; + + if (retval != SNAPPY_OK) + elog_snappy_error(retval, "snappy_compress", src_sz, dst_sz, *dst_used); + + PG_RETURN_VOID(); +} + +Datum +snappy_decompress_internal(PG_FUNCTION_ARGS) +{ + const char *src = PG_GETARG_POINTER(0); + size_t src_sz = PG_GETARG_INT32(1); + char *dst = PG_GETARG_POINTER(2); + int32 dst_sz = PG_GETARG_INT32(3); + int32 *dst_used = PG_GETARG_POINTER(4); + size_t uncompressed_length; + snappy_status retval; + + Insist(src_sz > 0 && dst_sz > 0); + + retval = snappy_uncompressed_length((char *) src, (size_t) src_sz, + &uncompressed_length); + if (retval != SNAPPY_OK) + elog_snappy_error(retval, "snappy_uncompressed_length", + src_sz, dst_sz, *dst_used); + + Insist(dst_sz >= uncompressed_length); + + retval = snappy_uncompress((char *) src, src_sz, (char *) dst, + &uncompressed_length); + *dst_used = uncompressed_length; + + if (retval != SNAPPY_OK) + elog_snappy_error(retval, "snappy_uncompressed", + src_sz, dst_sz, *dst_used); + + PG_RETURN_VOID(); +} + +Datum +snappy_validator(PG_FUNCTION_ARGS) +{ + PG_RETURN_VOID(); +} + +Datum rle_type_constructor(PG_FUNCTION_ARGS) { elog(ERROR, "rle_type block compression not supported"); @@ -437,10 +554,11 @@ compresstype_is_valid(char *comptype) cql("SELECT COUNT(*) FROM pg_compression " " WHERE compname = :1 ", NameGetDatum(&compname)))); - + + /* FIXME: This is a hack. Should register gzip handlers into pg_compression table. */ if(!found) { - if((strcmp(comptype, "snappy") == 0) || strcmp(comptype, "gzip") == 0) + if(strcmp(comptype, "gzip") == 0) found = true; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_appendonly.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_appendonly.h b/src/include/catalog/pg_appendonly.h index c78789b..5f0ef04 100644 --- a/src/include/catalog/pg_appendonly.h +++ b/src/include/catalog/pg_appendonly.h @@ -75,7 +75,7 @@ CATALOG(pg_appendonly,6105) BKI_WITHOUT_OIDS int2 majorversion; /* major version indicating what's stored in this table */ int2 minorversion; /* minor version indicating what's stored in this table */ bool checksum; /* true if checksum is stored with data and checked */ - text compresstype; /* the compressor used (zlib, or quicklz) */ + text compresstype; /* the compressor used (zlib, or or snappy, quicklz) */ bool columnstore; /* true if co or parquet table, false if ao table*/ Oid segrelid; /* OID of aoseg table; 0 if none */ Oid segidxid; /* if aoseg table, OID of segno index */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_compression.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_compression.h b/src/include/catalog/pg_compression.h index 80ef711..8746db9 100644 --- a/src/include/catalog/pg_compression.h +++ b/src/include/catalog/pg_compression.h @@ -117,6 +117,8 @@ typedef FormData_pg_compression *Form_pg_compression; /* TIDYCAT_END_CODEGEN */ /* Initial contents */ +DATA(insert OID = 3069 ( snappy gp_snappy_constructor gp_snappy_destructor gp_snappy_compress gp_snappy_decompress gp_snappy_validator PGUID )); + DATA(insert OID = 3060 ( zlib gp_zlib_constructor gp_zlib_destructor gp_zlib_compress gp_zlib_decompress gp_zlib_validator PGUID )); DATA(insert OID = 3061 ( quicklz gp_quicklz_constructor gp_quicklz_destructor gp_quicklz_compress gp_quicklz_decompress gp_quicklz_validator PGUID )); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_proc.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index e818909..4441126 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -10312,6 +10312,26 @@ DESCR("anytable type serialization input function"); DATA(insert OID = 3055 ( anytable_out PGNSP PGUID 12 f f t f i 1 2275 f "3053" _null_ _null_ _null_ anytable_out - _null_ n )); DESCR("anytable type serialization output function"); +/* gp_snappy_constructor(internal, internal, bool) => internal */ +DATA(insert OID = 5080 ( gp_snappy_constructor PGNSP PGUID 12 f f f f v 3 2281 f "2281 2281 16" _null_ _null_ _null_ snappy_constructor - _null_ n )); +DESCR("snappy constructor"); + +/* gp_snappy_destructor(internal) => void */ +DATA(insert OID = 5081 ( gp_snappy_destructor PGNSP PGUID 12 f f f f v 1 2278 f "2281" _null_ _null_ _null_ snappy_destructor - _null_ n )); +DESCR("snappy destructor"); + +/* gp_snappy_compress(internal, int4, internal, int4, internal, internal) => void */ +DATA(insert OID = 5082 ( gp_snappy_compress PGNSP PGUID 12 f f f f i 6 2278 f "2281 23 2281 23 2281 2281" _null_ _null_ _null_ snappy_compress_internal - _null_ n )); +DESCR("snappy compressor"); + +/* gp_snappy_decompress(internal, int4, internal, int4, internal, internal) => void */ +DATA(insert OID = 5083 ( gp_snappy_decompress PGNSP PGUID 12 f f f f i 6 2278 f "2281 23 2281 23 2281 2281" _null_ _null_ _null_ snappy_decompress_internal - _null_ n )); +DESCR("snappy decompressor"); + +/* gp_snappy_validator(internal) => void */ +DATA(insert OID = 9926 ( gp_snappy_validator PGNSP PGUID 12 f f f f i 1 2278 f "2281" _null_ _null_ _null_ snappy_validator - _null_ n )); +DESCR("snappy compression validator"); + /* gp_quicklz_constructor(internal, internal, bool) => internal */ DATA(insert OID = 5076 ( gp_quicklz_constructor PGNSP PGUID 12 f f f f v 3 2281 f "2281 2281 16" _null_ _null_ _null_ quicklz_constructor - _null_ n )); DESCR("quicklz constructor"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/catalog/pg_proc.sql ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql index 987b802..50985d4 100644 --- a/src/include/catalog/pg_proc.sql +++ b/src/include/catalog/pg_proc.sql @@ -5450,6 +5450,16 @@ CREATE FUNCTION anytable_out(anytable) RETURNS cstring LANGUAGE internal IMMUTABLE STRICT AS 'anytable_out' WITH (OID=3055, DESCRIPTION="anytable type serialization output function"); + CREATE FUNCTION gp_snappy_constructor(internal, internal, bool) RETURNS internal LANGUAGE internal VOLATILE AS 'snappy_constructor' WITH (OID=5080, DESCRIPTION="snappy constructor"); + + CREATE FUNCTION gp_snappy_destructor(internal) RETURNS void LANGUAGE internal VOLATILE AS 'snappy_destructor' WITH(OID=5081, DESCRIPTION="snappy destructor"); + + CREATE FUNCTION gp_snappy_compress(internal, int4, internal, int4, internal, internal) RETURNS void LANGUAGE internal IMMUTABLE AS 'snappy_compress_internal' WITH(OID=5082, DESCRIPTION="snappy compressor"); + + CREATE FUNCTION gp_snappy_decompress(internal, int4, internal, int4, internal, internal) RETURNS void LANGUAGE internal IMMUTABLE AS 'snappy_decompress_internal' WITH(OID=5083, DESCRIPTION="snappy decompressor"); + + CREATE FUNCTION gp_snappy_validator(internal) RETURNS void LANGUAGE internal IMMUTABLE AS 'snappy_validator' WITH(OID=9926, DESCRIPTION="snappy compression validator"); + CREATE FUNCTION gp_quicklz_constructor(internal, internal, bool) RETURNS internal LANGUAGE internal VOLATILE AS 'quicklz_constructor' WITH (OID=5076, DESCRIPTION="quicklz constructor"); CREATE FUNCTION gp_quicklz_destructor(internal) RETURNS void LANGUAGE internal VOLATILE AS 'quicklz_destructor' WITH(OID=5077, DESCRIPTION="quicklz destructor"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/355c4370/src/include/utils/builtins.h ---------------------------------------------------------------------- diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index af381c4..4dca6de 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1164,6 +1164,12 @@ extern Datum quicklz_compress(PG_FUNCTION_ARGS); extern Datum quicklz_decompress(PG_FUNCTION_ARGS); extern Datum quicklz_validator(PG_FUNCTION_ARGS); +extern Datum snappy_constructor(PG_FUNCTION_ARGS); +extern Datum snappy_destructor(PG_FUNCTION_ARGS); +extern Datum snappy_compress_internal(PG_FUNCTION_ARGS); +extern Datum snappy_decompress_internal(PG_FUNCTION_ARGS); +extern Datum snappy_validator(PG_FUNCTION_ARGS); + extern Datum zlib_constructor(PG_FUNCTION_ARGS); extern Datum zlib_destructor(PG_FUNCTION_ARGS); extern Datum zlib_compress(PG_FUNCTION_ARGS);
