This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit c79ab3228fddc1620b6790b0fee8ffe629b77017
Author: ztao1987 <zhenglin.ta...@gmail.com>
AuthorDate: Wed Mar 16 14:36:21 2022 +0800

    HAWQ-1834. add options for native orc table creation
---
 src/backend/access/common/reloptions.c | 278 +++++++++++++++++++++------------
 src/backend/access/orc/orcam.c         |  25 +--
 src/backend/utils/cache/relcache.c     |  37 +++++
 src/include/access/orcam.h             |   7 +
 src/include/utils/rel.h                |   5 +-
 5 files changed, 228 insertions(+), 124 deletions(-)

diff --git a/src/backend/access/common/reloptions.c 
b/src/backend/access/common/reloptions.c
index 6211fdd..5307f90 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -300,6 +300,175 @@ parseRelOptions(Datum options, int numkeywords, const 
char *const * keywords,
        }
 }
 
+/*
+ * Parse reloptions for native orc format
+ */
+void
+checkOrcOptions(Datum reloptions, bool validate, StdRdOptions *result)
+{
+  /*
+   * 1. Needn't check option 'appendonly' and 'orientation' because we already
+   *    check them in default_reloptions.
+   * 2. 'compresslevel' is a default option in reloptions, but we actually 
don't
+   *    use it in native orc format.
+   * 3. Everytime we add an option into orc_keywords, we should also add one
+   *    into default_keywords because there will perform a first check.
+   */
+  const char *const orc_keywords[] = {
+    "appendonly",
+    "orientation",
+    "compresstype",
+    "compresslevel",
+    "dicthreshold",
+    "compressblocksize",
+    "rowindexstride",
+    "stripesize",
+    "bloomfilter",
+    "bucketnum",
+  };
+
+  bool    appendonly = true;
+  char    columnstore = RELSTORAGE_ORC;
+  char*   compresstype = NULL;
+  int32   compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
+  int32   rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
+  int32   stripesize = DEFAULT_ORC_STRIPE_SIZE;
+  char*   bloomfilter = NULL;
+  int32   bucket_num = 0;
+  int     j = 0;
+
+  char     *orcOptionValues[ARRAY_SIZE(orc_keywords)];
+  parseRelOptions(reloptions, ARRAY_SIZE(orc_keywords), orc_keywords, 
orcOptionValues, validate);
+
+  /* orc compresstype */
+  if (orcOptionValues[2] != NULL)
+  {
+    compresstype = orcOptionValues[2];
+
+    if ((strcmp(compresstype, "snappy") != 0) && (strcmp(compresstype, "lz4") 
!= 0)
+            // XXX(changyong): The default zlib compression level of ORC table 
is Z_DEFAULT_COMPRESSION,
+            // and this is different from hive of which default compression 
level is (Z_BEST_SPEED + 1).
+            && (strcmp(compresstype, "zlib") != 0)
+            && (strcmp(compresstype, "zstd") != 0)
+            && (strcmp(compresstype, "none") != 0))
+    {
+      ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("orc table doesn't support compress type: \'%s\'", 
compresstype),
+             errOmitLocation(true)));
+    }
+
+    if (compresstype) {
+      StringInfoData option;
+      initStringInfo(&option);
+      appendStringInfo(&option, "\"compresstype\":\"%s\"",
+                             compresstype);
+      compresstype = pstrdup(option.data);
+    }
+  }
+
+  /* orc dicthreshold */
+  if (orcOptionValues[4] != NULL)
+  {
+    char *end;
+    double threshold = strtod(orcOptionValues[4], &end);
+    if (end == orcOptionValues[4] || *end != '\0' || threshold < 0 || 
threshold > 1)
+      ereport(ERROR,
+        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+         errmsg("\'dicthreshold\' must be within [0-1]"),
+             errOmitLocation(true)));
+    StringInfoData option;
+    initStringInfo(&option);
+    if (compresstype != NULL)
+      appendStringInfo(&option, "%s,",compresstype);
+    appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
+                     orcOptionValues[1]);
+    compresstype = pstrdup(option.data);
+  }
+
+  /* orc compressblocksize */
+  if (orcOptionValues[5] != NULL)
+  {
+    compressblocksize = pg_atoi(orcOptionValues[5], sizeof(int32), 0);
+    if ((compressblocksize < MIN_ORC_COMPRESS_BLOCK_SIZE) || 
(compressblocksize > MAX_ORC_COMPRESS_BLOCK_SIZE))
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("compressblock size for orc table should between 1B and 
1GB and should be specified in Bytes. "
+                 "Got %d Bytes", compressblocksize), errOmitLocation(true)));
+
+      compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
+    }
+  }
+
+  /* orc rowgroupsize */
+  if (orcOptionValues[6] != NULL)
+  {
+    rowindexstride = pg_atoi(orcOptionValues[6], sizeof(int32), 0);
+
+    if ((rowindexstride < MIN_ORC_ROW_GROUP_SIZE) || (rowindexstride > 
MAX_ORC_ROW_GROUP_SIZE))
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("row group size for orc table should between 1000 and 
1024*1024*1024. "
+                 "Got %d", rowindexstride), errOmitLocation(true)));
+
+      rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
+    }
+  }
+
+  /* orc stripesize */
+  if (orcOptionValues[7] != NULL)
+  {
+    stripesize = pg_atoi(orcOptionValues[7], sizeof(int32), 0);
+
+    if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > 
MAX_ORC_STRIPE_SIZE))
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("stripe size for orc table should between 1MB and 1GB and 
should be specified in MBytes. "
+                 "Got %d MB", stripesize), errOmitLocation(true)));
+
+      stripesize = DEFAULT_ORC_STRIPE_SIZE;
+    }
+  }
+
+  /* orc bloomfilter */
+  if (orcOptionValues[8] != NULL)
+  {
+    StringInfoData option;
+    initStringInfo(&option);
+    appendStringInfo(&option, orcOptionValues[8]);
+    bloomfilter = pstrdup(option.data);
+  }
+
+  /* orc bucket_num */
+  if (orcOptionValues[9] != NULL)
+  {
+    bucket_num= pg_atoi(orcOptionValues[9], sizeof(int32), 0);
+    if(bucket_num <= 0)
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("bucket number should be greater than 0. "
+                 "Got %d", bucket_num), errOmitLocation(true)));
+
+      bucket_num = 0;
+    }
+  }
+  result->compressblocksize = compressblocksize;
+  result->stripesize = stripesize;
+  result->rowindexstride = rowindexstride;
+  if (compresstype != NULL)
+    for (j = 0;j < strlen(compresstype); j++)
+      compresstype[j] = pg_tolower(compresstype[j]);
+  result->compresstype = compresstype;
+  result->bloomfilter = bloomfilter;
+}
 
 /*
  * Parse reloptions for anything using StdRdOptions
@@ -323,6 +492,8 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
                "dicthreshold",
                "bloomfilter",
                "stripesize",
+               "rowindexstride",
+               "compressblocksize",
        };
 
        char       *values[ARRAY_SIZE(default_keywords)];
@@ -330,7 +501,6 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
        int32           blocksize = DEFAULT_APPENDONLY_BLOCK_SIZE;
        int32           pagesize = DEFAULT_PARQUET_PAGE_SIZE;
        int32           rowgroupsize = DEFAULT_PARQUET_ROWGROUP_SIZE;
-       int32   stripesize = DEFAULT_ORC_STRIPE_SIZE;
        bool            appendonly = false;
        bool            checksum = false;
        char*           compresstype = NULL;
@@ -542,28 +712,6 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
                                                 errmsg("non-parquet table 
doesn't support compress type: \'%s\'", compresstype),
                                                 errOmitLocation(true)));
                }
-
-               if ((columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, 
"snappy") != 0)
-                       && (strcmp(compresstype, "lz4") != 0)
-                       // XXX(changyong): The default zlib compression level 
of ORC table is Z_DEFAULT_COMPRESSION,
-                       // and this is different from hive of which default 
compression level is (Z_BEST_SPEED + 1).
-                       && (strcmp(compresstype, "zlib") != 0)
-                       && (strcmp(compresstype, "zstd") != 0)
-                       && (strcmp(compresstype, "none") != 0))
-    {
-      ereport(ERROR,
-            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-             errmsg("orc table doesn't support compress type: \'%s\'", 
compresstype),
-             errOmitLocation(true)));
-    }
-
-               if (!(columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, 
"lz4") == 0))
-    {
-      ereport(ERROR,
-            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-             errmsg("non-orc table doesn't support compress type: \'%s\'", 
compresstype),
-             errOmitLocation(true)));
-    }
        }
 
        /* compression level */
@@ -644,14 +792,6 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
                compresslevel = setDefaultCompressionLevel(compresstype);
        }
 
-       if (columnstore == RELSTORAGE_ORC && compresstype) {
-    StringInfoData option;
-    initStringInfo(&option);
-    appendStringInfo(&option, "\"compresstype\":\"%s\"",
-                           compresstype);
-    compresstype = pstrdup(option.data);
-  }
-
        /* checksum */
        if (values[7] != NULL)
        {
@@ -811,75 +951,6 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
                                         errOmitLocation(true)));
        }
 
-  /* stripesize */
-  if (values[13] != NULL)
-  {
-    if(!(columnstore == RELSTORAGE_ORC)){
-      ereport(ERROR,
-          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-           errmsg("invalid option \'stripesize\' for non-orc table"),
-           errOmitLocation(true)));
-    }
-
-    stripesize = pg_atoi(values[13], sizeof(int32), 0);
-
-    if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > 
MAX_ORC_STRIPE_SIZE))
-    {
-      if (validate)
-        ereport(ERROR,
-            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-             errmsg("stripe size for orc table should between 1MB and 1GB and 
should be specified in MBytes. "
-                 "Got %d MB", stripesize), errOmitLocation(true)));
-
-      stripesize = DEFAULT_ORC_STRIPE_SIZE;
-    }
-  }
-
-       // dicthreshold
-       if (values[11] != NULL) {
-         if(!(columnstore == RELSTORAGE_ORC)){
-      ereport(ERROR,
-          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-           errmsg("invalid option \'dicthreshold\' for non-orc table"),
-           errOmitLocation(true)));
-    }
-         char *end;
-         double threshold = strtod(values[11], &end);
-         if (end == values[11] || *end != '\0' || threshold < 0 || threshold > 
1)
-           ereport(ERROR,
-        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-         errmsg("\'dicthreshold\' must be within [0-1]"),
-             errOmitLocation(true)));
-         StringInfoData option;
-         initStringInfo(&option);
-         if (compresstype != NULL)
-           appendStringInfo(&option, "%s,",compresstype);
-         appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
-                          values[11]);
-         compresstype = pstrdup(option.data);
-       }
-
-       // bloomfilter
-       if (values[12] != NULL) {
-    if(!(columnstore == RELSTORAGE_ORC)){
-      ereport(ERROR,
-          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-           errmsg("invalid option \'bloomfilter\' for non-orc table"),
-           errOmitLocation(true)));
-    }
-    ereport(ERROR,
-        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-         errmsg("option \'bloomfilter\' for orc table not supported yet"),
-         errOmitLocation(true)));
-    StringInfoData option;
-    initStringInfo(&option);
-    if (compresstype != NULL)
-      appendStringInfo(&option, "%s",compresstype);
-    appendStringInfo(&option, ",\"bloomfilter\": \"%s\"",
-                     values[12]);
-    compresstype = pstrdup(option.data);
-  }
-
        result = (StdRdOptions *) palloc(sizeof(StdRdOptions));
        SET_VARSIZE(result, sizeof(StdRdOptions));
 
@@ -888,7 +959,6 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
        result->blocksize = blocksize;
        result->pagesize = pagesize;
        result->rowgroupsize = rowgroupsize;
-       result->stripesize = stripesize;
        result->compresslevel = compresslevel;
        if (compresstype != NULL)
                for (j = 0;j < strlen(compresstype); j++)
@@ -900,6 +970,12 @@ default_reloptions(Datum reloptions, bool validate, char 
relkind,
        result->errorTable = errorTable;
        result->bucket_num = bucket_num;
 
+       // extra parse and check for ORC format
+       if (columnstore == RELSTORAGE_ORC)
+       {
+               checkOrcOptions(reloptions, validate, result);
+       }
+
        return (bytea *) result;
 }
 
diff --git a/src/backend/access/orc/orcam.c b/src/backend/access/orc/orcam.c
index f226844..f7260c3 100644
--- a/src/backend/access/orc/orcam.c
+++ b/src/backend/access/orc/orcam.c
@@ -279,7 +279,6 @@ static int32 GetSplitCount(List *fileSplits, Oid idxId) {
   return ret;
 }
 
->>>>>>> 7910d663d... step2
 OrcInsertDescData *orcBeginInsert(Relation rel,
                                   ResultRelSegFileInfo *segfileinfo) {
   OrcInsertDescData *insertDesc =
@@ -299,17 +298,7 @@ OrcInsertDescData *orcBeginInsert(Relation rel,
   AppendOnlyEntry *aoentry =
       GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
   StringInfoData option;
-  initStringInfo(&option);
-  appendStringInfoChar(&option, '{');
-  appendStringInfo(&option, "\"logicEof\": %" PRId64, segfileinfo->eof[0]);
-  appendStringInfo(&option, ", \"uncompressedEof\": %" PRId64,
-                   segfileinfo->uncompressed_eof[0]);
-  appendStringInfo(
-      &option, ", \"stripeSize\": %" PRId64,
-      ((StdRdOptions *)(rel->rd_options))->stripesize * 1024 * 1024);
-  if (aoentry->compresstype)
-    appendStringInfo(&option, ", %s", aoentry->compresstype);
-  appendStringInfoChar(&option, '}');
+  constructOrcFormatOptionString(&option, rel, segfileinfo, aoentry);
 
   insertDesc->orcFormatData = palloc0(sizeof(OrcFormatData));
   insertDesc->orcFormatData->fmt =
@@ -929,11 +918,7 @@ OrcDeleteDescData *orcBeginDelete(Relation rel, List 
*fileSplits,
   AppendOnlyEntry *aoentry =
       GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
   StringInfoData option;
-  initStringInfo(&option);
-  appendStringInfoChar(&option, '{');
-  if (aoentry->compresstype)
-    appendStringInfo(&option, "%s", aoentry->compresstype);
-  appendStringInfoChar(&option, '}');
+  constructOrcFormatOptionString(&option, rel, NULL, aoentry);
 
   int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
   char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
@@ -1047,11 +1032,7 @@ OrcUpdateDescData *orcBeginUpdate(Relation rel, List 
*fileSplits,
   AppendOnlyEntry *aoentry =
       GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
   StringInfoData option;
-  initStringInfo(&option);
-  appendStringInfoChar(&option, '{');
-  if (aoentry->compresstype)
-    appendStringInfo(&option, "%s", aoentry->compresstype);
-  appendStringInfoChar(&option, '}');
+  constructOrcFormatOptionString(&option, rel, NULL, aoentry);
 
   int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
   char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
diff --git a/src/backend/utils/cache/relcache.c 
b/src/backend/utils/cache/relcache.c
index a9f62ac..6c69dd7 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -789,9 +789,46 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
                case RELKIND_AOSEGMENTS:
                case RELKIND_AOBLOCKDIR:
                case RELKIND_UNCATALOGED:
+               {
+                       // check for bloom filter here because
+                       // we could not get tupledesc in default_reloptions.
+                       const char *const keywords[] =
+                       { "bloomfilter" };
+                       const int32_t keywords_size = 1;
+                       char *values[keywords_size];
+                       char *bloomfilter = NULL;
+                       char *key = "bloomfilter";
+
+                       parseRelOptions(datum, keywords_size, keywords, values, 
false);
+                       if (values[0] != NULL)
+                       {
+                               TupleDesc tup_desc = relation->rd_att;
+                               int attnum = tup_desc->natts;
+                               char **attribute_names = palloc0(attnum * 
sizeof(char*));
+                               for (int i = 0; i < attnum; ++i)
+                               {
+                                       int name_len =
+                                                       strlen(
+                                                                       
((Form_pg_attribute) (tup_desc->attrs[i]))->attname.data);
+                                       char *attribute = palloc0(name_len + 1);
+                                       strncpy(attribute, ((Form_pg_attribute )
+                                       (tup_desc->attrs[i]))->attname.data, 
name_len);
+                                       attribute_names[i] = attribute;
+                               }
+                               char *dup_val = pstrdup(values[0]);
+                               char *token = strtok(dup_val, ",");
+                               while (token)
+                               {
+                                       
checkPlugStorageFormatOption(&bloomfilter, key, token, true,
+                                                       attnum, 
attribute_names);
+                                       bloomfilter = NULL;
+                                       token = strtok(NULL, ",");
+                               }
+                       }
                        options = heap_reloptions(relation->rd_rel->relkind, 
datum,
                                                                          
false);
                        break;
+               }
                case RELKIND_INDEX:
                        options = index_reloptions(relation->rd_am->amoptions, 
datum,
                                                                           
false);
diff --git a/src/include/access/orcam.h b/src/include/access/orcam.h
index 47d24c2..871fc05 100644
--- a/src/include/access/orcam.h
+++ b/src/include/access/orcam.h
@@ -26,6 +26,13 @@
 #include "cdb/cdbquerycontextdispatching.h"
 #include "nodes/relation.h"
 
+#define DEFAULT_ORC_ROW_GROUP_SIZE 65536
+#define MIN_ORC_ROW_GROUP_SIZE 1000
+#define MAX_ORC_ROW_GROUP_SIZE 1024 * 1024 * 1024
+// here we use orc block size in Bytes
+#define DEFAULT_ORC_COMPRESS_BLOCK_SIZE 256 * 1024
+#define MIN_ORC_COMPRESS_BLOCK_SIZE 1
+#define MAX_ORC_COMPRESS_BLOCK_SIZE 1024 * 1024 * 1024
 // here we use orc stripe size in MBytes
 #define DEFAULT_ORC_STRIPE_SIZE 64
 #define MIN_ORC_STRIPE_SIZE 1
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 3fbeee3..2fbd2ac 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -285,7 +285,6 @@ typedef struct StdRdOptions
        int                     blocksize;              /* max varblock size 
(AO rels only) */
        int                     pagesize;               /* page size(Parquet 
rels only) */
        int                     rowgroupsize;   /* row group size (Parquet rels 
only)*/
-       int     stripesize;  /* stripe size (ORC rels only) */
        int                     compresslevel;  /* compression level (AO rels 
only) */
        char*           compresstype;   /* compression type (AO rels only) */
        bool            checksum;               /* checksum (AO rels only) */
@@ -293,6 +292,10 @@ typedef struct StdRdOptions
        bool            forceHeap;              /* specified appendonly=false */
        bool            errorTable;             /* skip GOH tablespace 
checking. */
        int             bucket_num;             /* default init segment num for 
random/hash/external table */
+       char*           bloomfilter;    /* columns using bloomfilter (ORC rels 
only) */
+       int             stripesize;     /* stripe size (ORC rels only) */
+       int             rowindexstride; /* row index stride (ORC rels only) */
+       int             compressblocksize;  /* compressblocksize in native orc, 
different from blocksize (ORC rels only) */
 } StdRdOptions;
 
 #define HEAP_MIN_FILLFACTOR                    10

Reply via email to