Repository: incubator-hawq
Updated Branches:
  refs/heads/master b6391f191 -> d8c8eb678


HAWQ-1606. This commit implements deciding to create Bloom Filter during query 
plan and create Bloom filter for inner table, including:

    1. Introduce a GUC, hawq_hashjoin_bloomfilter_max_memory_size, controls the 
maximum memory size for one bloom filter in hash join.
    2. Introduce a GUC, hawq_hashjoin_bloomfilter_ratio, when the ratio of (the 
estimated number of hash join tuples)/(number of tuples of outer table) is 
lower than the GUC, then Bloom filter can be used in hash join.
    3. Decide whether to create Bloom filter during query plan phase.
    4. During query execution phase, create Bloom filter structure and 
poputlate it for tuples from inner table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/d8c8eb67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/d8c8eb67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/d8c8eb67

Branch: refs/heads/master
Commit: d8c8eb678767198e6820aa9ac93edaf8e4a7bb61
Parents: b6391f1
Author: Wen Lin <w...@pivotal.io>
Authored: Tue Apr 17 14:05:23 2018 +0800
Committer: Wen Lin <w...@pivotal.io>
Committed: Tue Apr 17 14:05:23 2018 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbvars.c                       |   3 +
 src/backend/executor/nodeHash.c                 |  47 ++++++-
 src/backend/executor/nodeHashjoin.c             |   2 +
 src/backend/nodes/outfast.c                     |   2 +
 src/backend/nodes/readfast.c                    |   2 +
 src/backend/optimizer/path/costsize.c           |   3 +
 src/backend/optimizer/plan/createplan.c         |  37 +++++
 .../include/utils/simplestring.h                |   1 +
 .../resourcemanager/utils/simplestring.c        |  35 +++++
 src/backend/utils/hash/Makefile                 |   2 +-
 src/backend/utils/hash/bloomfilter.c            | 135 +++++++++++++++++++
 src/backend/utils/misc/guc.c                    |  40 ++++--
 src/include/cdb/cdbvars.h                       |   9 ++
 src/include/executor/hashjoin.h                 |   3 +
 src/include/nodes/execnodes.h                   |   4 +-
 src/include/nodes/plannodes.h                   |   2 +
 src/include/nodes/relation.h                    |   2 +
 src/include/utils/bloomfilter.h                 |  58 ++++++++
 18 files changed, 373 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/cdb/cdbvars.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index d8e8552..c017fad 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -236,6 +236,9 @@ int                 gp_hashagg_spillbatch_max = 0;
 /* hash join to use bloom filter: default to 0, means not used */
 int            hawq_hashjoin_bloomfilter = 0;
 
+/* maximum memory size for one Bloom filter */
+char*          hawq_hashjoin_bloomfilter_max_memory_size;
+
 /* Analyzing aid */
 int            gp_motion_slice_noop = 0;
 #ifdef ENABLE_LTRACE

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/executor/nodeHash.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index a461598..ec31d9a 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -56,7 +56,7 @@
 #include "utils/lsyscache.h"
 #include "utils/debugbreak.h"
 #include "utils/faultinjector.h"
-
+#include "resourcemanager/utils/simplestring.h"
 #include "cdb/cdbexplain.h"
 #include "cdb/cdbvars.h"
 
@@ -155,6 +155,12 @@ MultiExecHash(HashState *node)
                if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, 
node->hs_keepnull, &hashvalue, &hashkeys_null))
                {
                        ExecHashTableInsert(node, hashtable, slot, hashvalue);
+                       /* Insert hash values into Bloom filter */
+                       if (node->hashtable->bloomfilter != NULL && 
node->hashtable->bloomfilter->isCreated)
+                       {
+                               InsertBloomFilter(node->hashtable->bloomfilter, 
hashvalue);
+                               node->hashtable->bloomfilter->nInserted++;
+                       }
                }
 
                if (hashkeys_null)
@@ -321,6 +327,7 @@ ExecHashTableCreate(HashState *hashState, HashJoinState 
*hjstate, List *hashOper
         */
        hashtable = (HashJoinTable)palloc0(sizeof(HashJoinTableData));
        hashtable->buckets = NULL;
+       hashtable->bloomfilter = NULL;
        hashtable->curbatch = 0;
        hashtable->growEnabled = true;
        hashtable->totalTuples = 0;
@@ -348,6 +355,15 @@ ExecHashTableCreate(HashState *hashState, HashJoinState 
*hjstate, List *hashOper
                                                                                
                ALLOCSET_DEFAULT_INITSIZE,
                                                                                
                ALLOCSET_DEFAULT_MAXSIZE);
 
+       if (hjstate->useRuntimeFilter)
+       {
+               hashtable->bloomfilterCtx = 
AllocSetContextCreate(hashtable->hashCxt,
+                                                                               
                        "HashTableBloomFilterContext",
+                                                                               
                        ALLOCSET_DEFAULT_MINSIZE,
+                                                                               
                        ALLOCSET_DEFAULT_INITSIZE,
+                                                                               
                        ALLOCSET_DEFAULT_MAXSIZE);
+       }
+
        /* CDB */ /* track temp buf file allocations in separate context */
        hashtable->bfCxt = AllocSetContextCreate(CurrentMemoryContext,
                                                                                
         "hbbfcxt",
@@ -453,6 +469,30 @@ ExecHashTableCreate(HashState *hashState, HashJoinState 
*hjstate, List *hashOper
                palloc0(nbuckets * sizeof(HashJoinTuple));
 
        MemoryContextSwitchTo(oldcxt);
+
+
+       /*
+        * Initialize Bloom filter
+        */
+       if (hjstate->useRuntimeFilter)
+       {
+               /*
+                * The size of Bloom filter is decided by the estimated number 
of
+                * tuples from inner table, but won't exceed the GUC value
+                */
+               MemoryContextSwitchTo(hashtable->bloomfilterCtx);
+               SimpString valuestr;
+               setSimpleStringRef(&valuestr,
+                               hawq_hashjoin_bloomfilter_max_memory_size, 
strlen(hawq_hashjoin_bloomfilter_max_memory_size));
+               uint64_t max_size = 0;
+               SimpleStringToBytes(&valuestr, &max_size);
+               max_size = UpperPowerTwo(max_size);
+
+               int size = UpperPowerTwo(hjstate->estimatedInnerNum);
+               hashtable->bloomfilter = InitBloomFilter(min(size, max_size));
+               MemoryContextSwitchTo(oldcxt);
+       }
+
        }
        END_MEMORY_ACCOUNT();
        return hashtable;
@@ -716,7 +756,10 @@ ExecHashTableDestroy(HashState *hashState, HashJoinTable 
hashtable)
                hashtable->work_set = NULL;
        }
 
-       /* Release working memory (batchCxt is a child, so it goes away too) */
+       /*
+        * Release working memory (batchCxt and bloomfilterCxt are children,
+        * so they go away too)
+        */
        MemoryContextDelete(hashtable->hashCxt);
        hashtable->batches = NULL;
        }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/executor/nodeHashjoin.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index 9a8964d..cf2b596 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -552,6 +552,8 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
        hjstate = makeNode(HashJoinState);
        hjstate->js.ps.plan = (Plan *) node;
        hjstate->js.ps.state = estate;
+       hjstate->useRuntimeFilter = node->useRuntimeFilter;
+       hjstate->estimatedInnerNum = node->estimatedInnerNum;
 
        /*
         * Miscellaneous initialization

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/nodes/outfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 591a859..1ebd11f 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -777,6 +777,8 @@ _outHashJoin(StringInfo str, HashJoin *node)
 
        WRITE_LIST_FIELD(hashclauses);
        WRITE_LIST_FIELD(hashqualclauses);
+       WRITE_BOOL_FIELD(useRuntimeFilter);
+       WRITE_INT_FIELD(estimatedInnerNum);
 }
 
 static void

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/nodes/readfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index 7c2635c..f9aee80 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -3323,6 +3323,8 @@ _readHashJoin(const char ** str)
 
        READ_NODE_FIELD(hashclauses);
        READ_NODE_FIELD(hashqualclauses);
+       READ_BOOL_FIELD(useRuntimeFilter);
+       READ_INT_FIELD(estimatedInnerNum);
 
        READ_DONE();
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/optimizer/path/costsize.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index 48cb5ad..04de301 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1878,6 +1878,9 @@ cost_hashjoin(HashPath *path, PlannerInfo *root)
 
        /* approx # tuples passing the hash quals */
        hashjointuples = clamp_row_est(hash_selec * outer_path_rows * 
inner_path_rows);
+       /* For deciding if use bloomfilter on hash join */
+       path->hashjoin_ratio = (hashjointuples/outer_path_rows > 1.0) ? 1.0 : 
hashjointuples/outer_path_rows;
+       path->estimatedNumInner = inner_path_rows;
 
        /* cost of source data */
        startup_cost += outer_path->startup_cost;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/optimizer/plan/createplan.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/createplan.c 
b/src/backend/optimizer/plan/createplan.c
index 56c6a87..7a7f261 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -2912,6 +2912,29 @@ create_mergejoin_plan(CreatePlanContext *ctx,
        return join_plan;
 }
 
+/*
+ * Decide if use runtime filter for this hash join.
+ * If use, three conditions must be satisfied:
+ *     1. GUC hawq_hashjoin_bloomfilter is enable;
+ *     2. This hash join is not left outer join or full outer join or 
anti-join;
+ *     3. The ratio of (the estimated number of hash join tuples)/(number of 
tuples of outer table)
+ *             is lower than the GUC hawq_hashjoin_bloomfilter_ratio;
+ */
+static bool
+decide_use_runtime_filter(HashPath* path, JoinType type)
+{
+       if (!hawq_hashjoin_bloomfilter || type == JOIN_LEFT || type == JOIN_FULL
+                       || type == JOIN_LASJ || type == JOIN_LASJ_NOTIN ||
+                       path->hashjoin_ratio > hawq_hashjoin_bloomfilter_ratio)
+       {
+               return false;
+       }
+       else
+       {
+               return true;
+       }
+}
+
 static HashJoin *
 create_hashjoin_plan(CreatePlanContext *ctx,
                                         HashPath *best_path,
@@ -2980,6 +3003,20 @@ create_hashjoin_plan(CreatePlanContext *ctx,
                                                          (Plan *) hash_plan,
                                                          jointype);
 
+       /*
+        * decide if use runtime filter for this hash join.
+        */
+       join_plan->useRuntimeFilter = decide_use_runtime_filter(best_path, 
jointype);
+
+       /*
+        * recored the estimated number of rows from inner table,
+        * this number is used for the memory size of Bloom filter
+        */
+       if (join_plan->useRuntimeFilter)
+       {
+               join_plan->estimatedInnerNum = best_path->estimatedNumInner;
+       }
+
        /* 
         * MPP-4635.  best_path->jpath.outerjoinpath may be NULL.  
         * From the comment, it is adaptive nestloop join may cause this.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/resourcemanager/include/utils/simplestring.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/utils/simplestring.h 
b/src/backend/resourcemanager/include/utils/simplestring.h
index 2d48780..4849a8e 100644
--- a/src/backend/resourcemanager/include/utils/simplestring.h
+++ b/src/backend/resourcemanager/include/utils/simplestring.h
@@ -87,6 +87,7 @@ bool SimpleStringIsPercentage(SimpStringPtr str);
 int  SimpleStringToPercentage(SimpStringPtr str, int8_t *value);
 /* <integer>mb, <integer>gb, <integer>tb */
 int  SimpleStringToStorageSizeMB(SimpStringPtr str, uint32_t *value);
+int  SimpleStringToBytes(SimpStringPtr str, uint64_t *value);
 
 int  SimpleStringToMapIndexInt8(SimpStringPtr  str,
                                                                char            
   *strlist,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/resourcemanager/utils/simplestring.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/utils/simplestring.c 
b/src/backend/resourcemanager/utils/simplestring.c
index e117680..94df3d6 100644
--- a/src/backend/resourcemanager/utils/simplestring.c
+++ b/src/backend/resourcemanager/utils/simplestring.c
@@ -200,6 +200,41 @@ int  SimpleStringToStorageSizeMB(SimpStringPtr str, 
uint32_t *value)
        return FUNC_RETURN_OK;
 }
 
+int  SimpleStringToBytes(SimpStringPtr str, uint64_t *value)
+{
+       int     tail    = strlen(str->Str) - 1;
+       int     scanres = -1;
+       int32_t val;
+       char    buff[256];
+
+       if ( tail < 2 || tail > sizeof(buff)-1 )
+               return UTIL_SIMPSTRING_WRONG_FORMAT;
+
+       strncpy(buff, str->Str, tail-1);
+       buff[tail-1] = '\0';
+
+       scanres = sscanf(buff, "%d", &val);
+       if ( scanres != 1 )
+               return UTIL_SIMPSTRING_WRONG_FORMAT;
+
+       if ( (str->Str[tail]   == 'b' || str->Str[tail] == 'B' ) &&
+                (str->Str[tail-1] == 'k' || str->Str[tail-1] == 'K') ) {
+               *value = val * 1024;
+       }
+       else if ( (str->Str[tail]   == 'b' || str->Str[tail] == 'B' ) &&
+                (str->Str[tail-1] == 'm' || str->Str[tail-1] == 'M') ) {
+               *value = val * 1024 *1024;
+       }
+       else if ( (str->Str[tail]   == 'b' || str->Str[tail] == 'B' ) &&
+                         (str->Str[tail-1] == 'g' || str->Str[tail-1] == 'G') 
) {
+               *value = val * 1024 * 1024 *1024;
+       }
+       else {
+               return UTIL_SIMPSTRING_WRONG_FORMAT;
+       }
+       return FUNC_RETURN_OK;
+}
+
 int SimpleStringToMapIndexInt8(SimpStringPtr   str,
                                                           char                 
   *strlist,
                                                           int                  
        listsize,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/utils/hash/Makefile
----------------------------------------------------------------------
diff --git a/src/backend/utils/hash/Makefile b/src/backend/utils/hash/Makefile
index bfe658f..9eec20c 100644
--- a/src/backend/utils/hash/Makefile
+++ b/src/backend/utils/hash/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
 
 #override CPPFLAGS :=-msse4.2 $(CPPFLAGS)
 
-OBJS = dynahash.o hashfn.o pg_crc.o
+OBJS = dynahash.o hashfn.o pg_crc.o bloomfilter.o
 
 include $(top_srcdir)/src/backend/common.mk

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/utils/hash/bloomfilter.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/hash/bloomfilter.c 
b/src/backend/utils/hash/bloomfilter.c
new file mode 100644
index 0000000..3d1bd85
--- /dev/null
+++ b/src/backend/utils/hash/bloomfilter.c
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "utils/bloomfilter.h"
+#include "utils/elog.h"
+#include "utils/palloc.h"
+#include "lib/stringinfo.h"
+#include <assert.h>
+#include <math.h>
+
+uint32_t HASH_SEEDS[8] __attribute__((aligned(32))) = { 0x14EBCDFFU,
+        0x2A1C1A99U, 0x85CB78FBU, 0x6E8F82DDU, 0xF8464DFFU, 0x1028FEADU,
+        0x74F04A4DU, 0x1832DB75U };
+
+static uint32_t getBucketIdx(uint32_t hash, uint32_t mask)
+{
+    /* use Knuth's multiplicative hash */
+    return ((uint64_t) (hash) * 2654435769ul >> 32) & mask;
+}
+
+/*
+ * Returns the smallest power of two that is bigger than v.
+ */
+int64_t UpperPowerTwo(int64_t v) {
+    --v;
+    v |= v >> 1;
+    v |= v >> 2;
+    v |= v >> 4;
+    v |= v >> 8;
+    v |= v >> 16;
+    v |= v >> 32;
+    ++v;
+    return v;
+}
+
+/*
+ * Initialize a Bloom filter structure with the memory size of Bloom filter.
+ */
+BloomFilter InitBloomFilter(int memory_size)
+{
+    BloomFilter bf;
+    uint32_t nBuckets = max(1, 
memory_size/(sizeof(BucketWord)*NUM_BUCKET_WORDS));
+    size_t size = nBuckets*NUM_BUCKET_WORDS*sizeof(BucketWord);
+    bf = palloc0(offsetof(BloomFilterData, data)  + size);
+    bf->nInserted = bf->nTested = bf->nMatched = 0;
+    bf->nBuckets = nBuckets;
+    bf->data_mask = bf->nBuckets - 1;
+    bf->data_size = size;
+    bf->isCreated = true;
+    elog(DEBUG3, "Create a Bloom filter with number of buckets:%d, size:%d",
+                 bf->nBuckets, size);
+    return bf;
+}
+
+/*
+ * Insert a value into Bloom filter.
+ */
+void InsertBloomFilter(BloomFilter bf, uint32_t value)
+{
+    uint32_t bucket_idx = getBucketIdx(value, bf->data_mask);
+    uint32_t new_bucket[8];
+    for (int i = 0; i < NUM_BUCKET_WORDS; ++i) {
+        /*
+         * Multiply-shift hashing proposed by Dietzfelbinger et al.
+         * hash a value universally into 5 bits using the random odd seed.
+         */
+        new_bucket[i] = (HASH_SEEDS[i] * value) >> (32 - LOG_BUCKET_WORD_BITS);
+        new_bucket[i] = 1U << new_bucket[i];
+        bf->data[bucket_idx][i] |= new_bucket[i];
+    }
+}
+
+/*
+ * Check whether a value is in this Bloom filter or not.
+ */
+bool FindBloomFilter(BloomFilter bf, uint32_t value)
+{
+    uint32_t bucket_idx = getBucketIdx(value, bf->data_mask);
+    for (int i = 0; i < NUM_BUCKET_WORDS; ++i)
+    {
+        BucketWord hval = (HASH_SEEDS[i] * value) >> (32 - 
LOG_BUCKET_WORD_BITS);
+        hval = 1U << hval;
+        if (!(bf->data[bucket_idx][i] & hval))
+        {
+            return false;
+        }
+    }
+    return true;
+}
+
+void PrintBloomFilter(BloomFilter bf)
+{
+    StringInfo bfinfo = makeStringInfo();
+    appendStringInfo(bfinfo, "##### Print Bloom filter #####\n");
+    appendStringInfo(bfinfo, "data_mask: %x\n ", bf->data_mask);
+    appendStringInfo(bfinfo, "number of buckets: %d\n ", bf->nBuckets);
+    appendStringInfo(bfinfo, "number of inserted: %d\n ", bf->nInserted);
+    appendStringInfo(bfinfo, "number of tested: %d\n ", bf->nTested);
+    appendStringInfo(bfinfo, "number of matched: %d\n ", bf->nMatched);
+    appendStringInfo(bfinfo, "size: %d\n ", bf->data_size);
+    appendStringInfo(bfinfo, "##### END Print Bloom filter #####\n");
+    elog(LOG, "%s", bfinfo->data);
+    if(bfinfo->data != NULL)
+    {
+        pfree(bfinfo->data);
+    }
+}
+
+/*
+ * Destroy a Bloom filter structure.
+ */
+void DestroyBloomFilter(BloomFilter bf)
+{
+    if (bf != NULL)
+    {
+        pfree(bf);
+    }
+    elog(DEBUG3, "Destroy a Bloom filter");
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 64449da..718a8b6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -738,6 +738,7 @@ double        optimizer_cost_threshold;
 double  optimizer_nestloop_factor;
 double  locality_upper_bound;
 double  net_disk_ratio;
+double hawq_hashjoin_bloomfilter_ratio;
 bool           optimizer_cte_inlining;
 int            optimizer_cte_inlining_bound;
 double         optimizer_damping_factor_filter;
@@ -7124,6 +7125,16 @@ static struct config_real ConfigureNamesReal[] =
                0.5, 0.0, 1.0, NULL, NULL
        },
 
+       {
+               {"hawq_hashjoin_bloomfilter_ratio", PGC_USERSET, PRESET_OPTIONS,
+                       gettext_noop("Sets the ratio for hash join Bloom 
filter."),
+                       NULL,
+                       GUC_NO_SHOW_ALL
+               },
+               &hawq_hashjoin_bloomfilter_ratio,
+               0.4, 0.0, 1.0, NULL, NULL
+       },
+
 /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, 0.0, 0.0, 0.0, NULL, NULL
@@ -8252,14 +8263,14 @@ static struct config_string ConfigureNamesString[] =
                "64GB", NULL, NULL
        },
 
-    {
+       {
                {"hawq_global_rm_type", PGC_POSTMASTER, RESOURCES_MGM,
                                gettext_noop("set resource management server 
type"),
                                NULL
                },
                &rm_global_rm_type,
                "none", NULL, NULL
-    },
+       },
 
        {
                {"hawq_rm_yarn_address", PGC_POSTMASTER, RESOURCES_MGM,
@@ -8306,14 +8317,23 @@ static struct config_string ConfigureNamesString[] =
                "", NULL, NULL
        },
 
-    {
-        {"hawq_rm_stmt_vseg_memory", PGC_USERSET, RESOURCES_MGM,
-            gettext_noop("the memory quota of one virtual segment for one 
statement."),
-            NULL
-        },
-        &rm_stmt_vseg_mem_str,
-        "128mb", assign_hawq_rm_stmt_vseg_memory, NULL
-    },
+       {
+               {"hawq_rm_stmt_vseg_memory", PGC_USERSET, RESOURCES_MGM,
+                       gettext_noop("the memory quota of one virtual segment 
for one statement."),
+                       NULL
+               },
+               &rm_stmt_vseg_mem_str,
+               "128mb", assign_hawq_rm_stmt_vseg_memory, NULL
+       },
+
+       {
+               {"hawq_hashjoin_bloomfilter_max_memory_size", PGC_USERSET, 
PRESET_OPTIONS,
+                       gettext_noop("The maximum memory size for bloom filter 
in hash join, with KB or MB"),
+                       GUC_NO_SHOW_ALL
+               },
+               &hawq_hashjoin_bloomfilter_max_memory_size,
+               "2MB", NULL, NULL
+       },
 
        {
                {"hawq_re_cgroup_mount_point", PGC_POSTMASTER, RESOURCES_MGM,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/include/cdb/cdbvars.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index 7ce988a..e7737a5 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -939,6 +939,15 @@ extern int gp_hashagg_spillbatch_max;
 /* Hashjoin use bloom filter */
 extern int hawq_hashjoin_bloomfilter;
 
+/* Maximum memory size for one Bloom filter */
+extern char* hawq_hashjoin_bloomfilter_max_memory_size;
+/*
+ * This guc value controls the ratio of (number of hash join tuples)/(number 
of tuples of outer table),
+ * since the bloomfilter creation/check has cost, it is meaningless to use 
bloomfilter if most of tuples of outer table
+ * can't be filtered. So, if the estimated value is lower than this guc, then 
we can apply bloomfilter on hash join.
+ */
+extern double hawq_hashjoin_bloomfilter_ratio;
+
 /* Get statistics for partitioned parent from a child */
 extern bool    gp_statistics_pullup_from_child_partition;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/include/executor/hashjoin.h
----------------------------------------------------------------------
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 208aed6..3a90d2a 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -168,6 +168,8 @@ typedef struct HashJoinTableData
        /* buckets[i] is head of list of tuples in i'th in-memory bucket */
        struct HashJoinTupleData **buckets;
 
+       BloomFilter bloomfilter;
+
        /* buckets array is per-batch storage, as are all the tuples */
 
        int                     nbatch;                 /* number of batches */
@@ -201,6 +203,7 @@ typedef struct HashJoinTableData
 
        MemoryContext hashCxt;          /* context for whole-hash-join storage 
*/
        MemoryContext batchCxt;         /* context for this-batch-only storage 
*/
+       MemoryContext bloomfilterCtx; /* context for Bloom filter */
        MemoryContext bfCxt;            /* CDB */ /* context for temp buf file 
*/
 
     HashJoinTableStats *stats;  /* statistics workarea for EXPLAIN ANALYZE */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/include/nodes/execnodes.h
----------------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index cf1b974..515043b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -45,7 +45,7 @@
 #include "utils/relcache.h"
 #include "gpmon/gpmon.h"                /* gpmon_packet_t */
 #include "utils/memaccounting.h"
-
+#include "utils/bloomfilter.h"
 
 /*
  * Currently, since grouping is defined as uint64 internally, it limits the
@@ -2117,6 +2117,8 @@ typedef struct HashJoinState
         bool workfiles_created;
         /* number of batches when we loaded from the state. -1 means not 
loaded yet */
         int nbatch_loaded_state;
+        bool useRuntimeFilter;
+        int  estimatedInnerNum;
 
 } HashJoinState;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/include/nodes/plannodes.h
----------------------------------------------------------------------
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index cbf48d8..3fd1922 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -823,6 +823,8 @@ typedef struct HashJoin
        Join            join;
        List       *hashclauses;
        List       *hashqualclauses;
+       bool            useRuntimeFilter;
+       int             estimatedInnerNum;
 } HashJoin;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/include/nodes/relation.h
----------------------------------------------------------------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 0d42f97..95f83f0 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1067,6 +1067,8 @@ typedef struct HashPath
 {
        JoinPath        jpath;
        List       *path_hashclauses;           /* join clauses used for 
hashing */
+       double          hashjoin_ratio;
+       double          estimatedNumInner;
 } HashPath;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d8c8eb67/src/include/utils/bloomfilter.h
----------------------------------------------------------------------
diff --git a/src/include/utils/bloomfilter.h b/src/include/utils/bloomfilter.h
new file mode 100644
index 0000000..8b91a90
--- /dev/null
+++ b/src/include/utils/bloomfilter.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef BLOOMFILTER_H
+#define BLOOMFILTER_H
+#include <stdint.h>
+#include "c.h"
+
+/*
+ * Blocked Bloom filter, proposed by FELIX PUTZE et al, in paper
+ * Cache-, Hash- and Space-Efficient Bloom Filters.
+ * The idea is to divide Bloom filter into several buckets(blocks), each value 
inserted
+ * into the Bloom filter, will be hashed into one bucket. A bucket contains 
fixed-length bits.
+ * This implementation refers to Impala's Bloom filter.
+ */
+#define NUM_BUCKET_WORDS 8
+typedef uint32_t BucketWord;
+typedef BucketWord BUCKET[NUM_BUCKET_WORDS];
+
+/* log2(number of bits in a BucketWord) */
+#define LOG_BUCKET_WORD_BITS 5
+
+typedef struct BloomFilterData
+{
+    bool     isCreated;
+    uint32_t nBuckets;
+    uint32_t nInserted;
+    uint32_t nTested;
+    uint32_t nMatched;
+    size_t   data_size;
+    uint32_t data_mask;
+    BUCKET   data[1];
+} BloomFilterData;
+typedef BloomFilterData *BloomFilter;
+
+extern int64_t UpperPowerTwo(int64_t v);
+extern BloomFilter InitBloomFilter(int memory_size);
+extern void InsertBloomFilter(BloomFilter bf, uint32_t value);
+extern bool FindBloomFilter(BloomFilter bf, uint32_t value);
+extern void PrintBloomFilter(BloomFilter bf);
+extern void DestroyBloomFilter(BloomFilter bf);
+
+#endif

Reply via email to