This is an automated email from the ASF dual-hosted git repository. ztao1987 pushed a commit to branch ztao in repository https://gitbox.apache.org/repos/asf/hawq.git
commit c79ab3228fddc1620b6790b0fee8ffe629b77017 Author: ztao1987 <zhenglin.ta...@gmail.com> AuthorDate: Wed Mar 16 14:36:21 2022 +0800 HAWQ-1834. add options for native orc table creation --- src/backend/access/common/reloptions.c | 278 +++++++++++++++++++++------------ src/backend/access/orc/orcam.c | 25 +-- src/backend/utils/cache/relcache.c | 37 +++++ src/include/access/orcam.h | 7 + src/include/utils/rel.h | 5 +- 5 files changed, 228 insertions(+), 124 deletions(-) diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 6211fdd..5307f90 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -300,6 +300,175 @@ parseRelOptions(Datum options, int numkeywords, const char *const * keywords, } } +/* + * Parse reloptions for native orc format + */ +void +checkOrcOptions(Datum reloptions, bool validate, StdRdOptions *result) +{ + /* + * 1. Needn't check option 'appendonly' and 'orientation' because we already + * check them in default_reloptions. + * 2. 'compresslevel' is a default option in reloptions, but we actually don't + * use it in native orc format. + * 3. Everytime we add an option into orc_keywords, we should also add one + * into default_keywords because there will perform a first check. + */ + const char *const orc_keywords[] = { + "appendonly", + "orientation", + "compresstype", + "compresslevel", + "dicthreshold", + "compressblocksize", + "rowindexstride", + "stripesize", + "bloomfilter", + "bucketnum", + }; + + bool appendonly = true; + char columnstore = RELSTORAGE_ORC; + char* compresstype = NULL; + int32 compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE; + int32 rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE; + int32 stripesize = DEFAULT_ORC_STRIPE_SIZE; + char* bloomfilter = NULL; + int32 bucket_num = 0; + int j = 0; + + char *orcOptionValues[ARRAY_SIZE(orc_keywords)]; + parseRelOptions(reloptions, ARRAY_SIZE(orc_keywords), orc_keywords, orcOptionValues, validate); + + /* orc compresstype */ + if (orcOptionValues[2] != NULL) + { + compresstype = orcOptionValues[2]; + + if ((strcmp(compresstype, "snappy") != 0) && (strcmp(compresstype, "lz4") != 0) + // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION, + // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1). + && (strcmp(compresstype, "zlib") != 0) + && (strcmp(compresstype, "zstd") != 0) + && (strcmp(compresstype, "none") != 0)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("orc table doesn't support compress type: \'%s\'", compresstype), + errOmitLocation(true))); + } + + if (compresstype) { + StringInfoData option; + initStringInfo(&option); + appendStringInfo(&option, "\"compresstype\":\"%s\"", + compresstype); + compresstype = pstrdup(option.data); + } + } + + /* orc dicthreshold */ + if (orcOptionValues[4] != NULL) + { + char *end; + double threshold = strtod(orcOptionValues[4], &end); + if (end == orcOptionValues[4] || *end != '\0' || threshold < 0 || threshold > 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\'dicthreshold\' must be within [0-1]"), + errOmitLocation(true))); + StringInfoData option; + initStringInfo(&option); + if (compresstype != NULL) + appendStringInfo(&option, "%s,",compresstype); + appendStringInfo(&option, "\"dicthreshold\": \"%s\"", + orcOptionValues[1]); + compresstype = pstrdup(option.data); + } + + /* orc compressblocksize */ + if (orcOptionValues[5] != NULL) + { + compressblocksize = pg_atoi(orcOptionValues[5], sizeof(int32), 0); + if ((compressblocksize < MIN_ORC_COMPRESS_BLOCK_SIZE) || (compressblocksize > MAX_ORC_COMPRESS_BLOCK_SIZE)) + { + if (validate) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("compressblock size for orc table should between 1B and 1GB and should be specified in Bytes. " + "Got %d Bytes", compressblocksize), errOmitLocation(true))); + + compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE; + } + } + + /* orc rowgroupsize */ + if (orcOptionValues[6] != NULL) + { + rowindexstride = pg_atoi(orcOptionValues[6], sizeof(int32), 0); + + if ((rowindexstride < MIN_ORC_ROW_GROUP_SIZE) || (rowindexstride > MAX_ORC_ROW_GROUP_SIZE)) + { + if (validate) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("row group size for orc table should between 1000 and 1024*1024*1024. " + "Got %d", rowindexstride), errOmitLocation(true))); + + rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE; + } + } + + /* orc stripesize */ + if (orcOptionValues[7] != NULL) + { + stripesize = pg_atoi(orcOptionValues[7], sizeof(int32), 0); + + if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE)) + { + if (validate) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. " + "Got %d MB", stripesize), errOmitLocation(true))); + + stripesize = DEFAULT_ORC_STRIPE_SIZE; + } + } + + /* orc bloomfilter */ + if (orcOptionValues[8] != NULL) + { + StringInfoData option; + initStringInfo(&option); + appendStringInfo(&option, orcOptionValues[8]); + bloomfilter = pstrdup(option.data); + } + + /* orc bucket_num */ + if (orcOptionValues[9] != NULL) + { + bucket_num= pg_atoi(orcOptionValues[9], sizeof(int32), 0); + if(bucket_num <= 0) + { + if (validate) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bucket number should be greater than 0. " + "Got %d", bucket_num), errOmitLocation(true))); + + bucket_num = 0; + } + } + result->compressblocksize = compressblocksize; + result->stripesize = stripesize; + result->rowindexstride = rowindexstride; + if (compresstype != NULL) + for (j = 0;j < strlen(compresstype); j++) + compresstype[j] = pg_tolower(compresstype[j]); + result->compresstype = compresstype; + result->bloomfilter = bloomfilter; +} /* * Parse reloptions for anything using StdRdOptions @@ -323,6 +492,8 @@ default_reloptions(Datum reloptions, bool validate, char relkind, "dicthreshold", "bloomfilter", "stripesize", + "rowindexstride", + "compressblocksize", }; char *values[ARRAY_SIZE(default_keywords)]; @@ -330,7 +501,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind, int32 blocksize = DEFAULT_APPENDONLY_BLOCK_SIZE; int32 pagesize = DEFAULT_PARQUET_PAGE_SIZE; int32 rowgroupsize = DEFAULT_PARQUET_ROWGROUP_SIZE; - int32 stripesize = DEFAULT_ORC_STRIPE_SIZE; bool appendonly = false; bool checksum = false; char* compresstype = NULL; @@ -542,28 +712,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind, errmsg("non-parquet table doesn't support compress type: \'%s\'", compresstype), errOmitLocation(true))); } - - if ((columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "snappy") != 0) - && (strcmp(compresstype, "lz4") != 0) - // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION, - // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1). - && (strcmp(compresstype, "zlib") != 0) - && (strcmp(compresstype, "zstd") != 0) - && (strcmp(compresstype, "none") != 0)) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("orc table doesn't support compress type: \'%s\'", compresstype), - errOmitLocation(true))); - } - - if (!(columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "lz4") == 0)) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("non-orc table doesn't support compress type: \'%s\'", compresstype), - errOmitLocation(true))); - } } /* compression level */ @@ -644,14 +792,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind, compresslevel = setDefaultCompressionLevel(compresstype); } - if (columnstore == RELSTORAGE_ORC && compresstype) { - StringInfoData option; - initStringInfo(&option); - appendStringInfo(&option, "\"compresstype\":\"%s\"", - compresstype); - compresstype = pstrdup(option.data); - } - /* checksum */ if (values[7] != NULL) { @@ -811,75 +951,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind, errOmitLocation(true))); } - /* stripesize */ - if (values[13] != NULL) - { - if(!(columnstore == RELSTORAGE_ORC)){ - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid option \'stripesize\' for non-orc table"), - errOmitLocation(true))); - } - - stripesize = pg_atoi(values[13], sizeof(int32), 0); - - if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE)) - { - if (validate) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. " - "Got %d MB", stripesize), errOmitLocation(true))); - - stripesize = DEFAULT_ORC_STRIPE_SIZE; - } - } - - // dicthreshold - if (values[11] != NULL) { - if(!(columnstore == RELSTORAGE_ORC)){ - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid option \'dicthreshold\' for non-orc table"), - errOmitLocation(true))); - } - char *end; - double threshold = strtod(values[11], &end); - if (end == values[11] || *end != '\0' || threshold < 0 || threshold > 1) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("\'dicthreshold\' must be within [0-1]"), - errOmitLocation(true))); - StringInfoData option; - initStringInfo(&option); - if (compresstype != NULL) - appendStringInfo(&option, "%s,",compresstype); - appendStringInfo(&option, "\"dicthreshold\": \"%s\"", - values[11]); - compresstype = pstrdup(option.data); - } - - // bloomfilter - if (values[12] != NULL) { - if(!(columnstore == RELSTORAGE_ORC)){ - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid option \'bloomfilter\' for non-orc table"), - errOmitLocation(true))); - } - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("option \'bloomfilter\' for orc table not supported yet"), - errOmitLocation(true))); - StringInfoData option; - initStringInfo(&option); - if (compresstype != NULL) - appendStringInfo(&option, "%s",compresstype); - appendStringInfo(&option, ",\"bloomfilter\": \"%s\"", - values[12]); - compresstype = pstrdup(option.data); - } - result = (StdRdOptions *) palloc(sizeof(StdRdOptions)); SET_VARSIZE(result, sizeof(StdRdOptions)); @@ -888,7 +959,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind, result->blocksize = blocksize; result->pagesize = pagesize; result->rowgroupsize = rowgroupsize; - result->stripesize = stripesize; result->compresslevel = compresslevel; if (compresstype != NULL) for (j = 0;j < strlen(compresstype); j++) @@ -900,6 +970,12 @@ default_reloptions(Datum reloptions, bool validate, char relkind, result->errorTable = errorTable; result->bucket_num = bucket_num; + // extra parse and check for ORC format + if (columnstore == RELSTORAGE_ORC) + { + checkOrcOptions(reloptions, validate, result); + } + return (bytea *) result; } diff --git a/src/backend/access/orc/orcam.c b/src/backend/access/orc/orcam.c index f226844..f7260c3 100644 --- a/src/backend/access/orc/orcam.c +++ b/src/backend/access/orc/orcam.c @@ -279,7 +279,6 @@ static int32 GetSplitCount(List *fileSplits, Oid idxId) { return ret; } ->>>>>>> 7910d663d... step2 OrcInsertDescData *orcBeginInsert(Relation rel, ResultRelSegFileInfo *segfileinfo) { OrcInsertDescData *insertDesc = @@ -299,17 +298,7 @@ OrcInsertDescData *orcBeginInsert(Relation rel, AppendOnlyEntry *aoentry = GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow); StringInfoData option; - initStringInfo(&option); - appendStringInfoChar(&option, '{'); - appendStringInfo(&option, "\"logicEof\": %" PRId64, segfileinfo->eof[0]); - appendStringInfo(&option, ", \"uncompressedEof\": %" PRId64, - segfileinfo->uncompressed_eof[0]); - appendStringInfo( - &option, ", \"stripeSize\": %" PRId64, - ((StdRdOptions *)(rel->rd_options))->stripesize * 1024 * 1024); - if (aoentry->compresstype) - appendStringInfo(&option, ", %s", aoentry->compresstype); - appendStringInfoChar(&option, '}'); + constructOrcFormatOptionString(&option, rel, segfileinfo, aoentry); insertDesc->orcFormatData = palloc0(sizeof(OrcFormatData)); insertDesc->orcFormatData->fmt = @@ -929,11 +918,7 @@ OrcDeleteDescData *orcBeginDelete(Relation rel, List *fileSplits, AppendOnlyEntry *aoentry = GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow); StringInfoData option; - initStringInfo(&option); - appendStringInfoChar(&option, '{'); - if (aoentry->compresstype) - appendStringInfo(&option, "%s", aoentry->compresstype); - appendStringInfoChar(&option, '}'); + constructOrcFormatOptionString(&option, rel, NULL, aoentry); int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1; char *hdfsPath = (char *)palloc0(hdfsPathMaxLen); @@ -1047,11 +1032,7 @@ OrcUpdateDescData *orcBeginUpdate(Relation rel, List *fileSplits, AppendOnlyEntry *aoentry = GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow); StringInfoData option; - initStringInfo(&option); - appendStringInfoChar(&option, '{'); - if (aoentry->compresstype) - appendStringInfo(&option, "%s", aoentry->compresstype); - appendStringInfoChar(&option, '}'); + constructOrcFormatOptionString(&option, rel, NULL, aoentry); int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1; char *hdfsPath = (char *)palloc0(hdfsPathMaxLen); diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index a9f62ac..6c69dd7 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -789,9 +789,46 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple) case RELKIND_AOSEGMENTS: case RELKIND_AOBLOCKDIR: case RELKIND_UNCATALOGED: + { + // check for bloom filter here because + // we could not get tupledesc in default_reloptions. + const char *const keywords[] = + { "bloomfilter" }; + const int32_t keywords_size = 1; + char *values[keywords_size]; + char *bloomfilter = NULL; + char *key = "bloomfilter"; + + parseRelOptions(datum, keywords_size, keywords, values, false); + if (values[0] != NULL) + { + TupleDesc tup_desc = relation->rd_att; + int attnum = tup_desc->natts; + char **attribute_names = palloc0(attnum * sizeof(char*)); + for (int i = 0; i < attnum; ++i) + { + int name_len = + strlen( + ((Form_pg_attribute) (tup_desc->attrs[i]))->attname.data); + char *attribute = palloc0(name_len + 1); + strncpy(attribute, ((Form_pg_attribute ) + (tup_desc->attrs[i]))->attname.data, name_len); + attribute_names[i] = attribute; + } + char *dup_val = pstrdup(values[0]); + char *token = strtok(dup_val, ","); + while (token) + { + checkPlugStorageFormatOption(&bloomfilter, key, token, true, + attnum, attribute_names); + bloomfilter = NULL; + token = strtok(NULL, ","); + } + } options = heap_reloptions(relation->rd_rel->relkind, datum, false); break; + } case RELKIND_INDEX: options = index_reloptions(relation->rd_am->amoptions, datum, false); diff --git a/src/include/access/orcam.h b/src/include/access/orcam.h index 47d24c2..871fc05 100644 --- a/src/include/access/orcam.h +++ b/src/include/access/orcam.h @@ -26,6 +26,13 @@ #include "cdb/cdbquerycontextdispatching.h" #include "nodes/relation.h" +#define DEFAULT_ORC_ROW_GROUP_SIZE 65536 +#define MIN_ORC_ROW_GROUP_SIZE 1000 +#define MAX_ORC_ROW_GROUP_SIZE 1024 * 1024 * 1024 +// here we use orc block size in Bytes +#define DEFAULT_ORC_COMPRESS_BLOCK_SIZE 256 * 1024 +#define MIN_ORC_COMPRESS_BLOCK_SIZE 1 +#define MAX_ORC_COMPRESS_BLOCK_SIZE 1024 * 1024 * 1024 // here we use orc stripe size in MBytes #define DEFAULT_ORC_STRIPE_SIZE 64 #define MIN_ORC_STRIPE_SIZE 1 diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 3fbeee3..2fbd2ac 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -285,7 +285,6 @@ typedef struct StdRdOptions int blocksize; /* max varblock size (AO rels only) */ int pagesize; /* page size(Parquet rels only) */ int rowgroupsize; /* row group size (Parquet rels only)*/ - int stripesize; /* stripe size (ORC rels only) */ int compresslevel; /* compression level (AO rels only) */ char* compresstype; /* compression type (AO rels only) */ bool checksum; /* checksum (AO rels only) */ @@ -293,6 +292,10 @@ typedef struct StdRdOptions bool forceHeap; /* specified appendonly=false */ bool errorTable; /* skip GOH tablespace checking. */ int bucket_num; /* default init segment num for random/hash/external table */ + char* bloomfilter; /* columns using bloomfilter (ORC rels only) */ + int stripesize; /* stripe size (ORC rels only) */ + int rowindexstride; /* row index stride (ORC rels only) */ + int compressblocksize; /* compressblocksize in native orc, different from blocksize (ORC rels only) */ } StdRdOptions; #define HEAP_MIN_FILLFACTOR 10