Repository: incubator-madlib
Updated Branches:
  refs/heads/master 755c7b70d -> 186aefa6d


Bugfix: Fix multiple bugs and perf issue in grouping

JIRA: MADLIB-1100, MADLIB-1107

The default threshold valud is set to 1e-5 instead of a value
dependent on the number of vertices (1/num_vertices*1000). The
older docs said it would be (1/number_vertices*100), but latest
experiments showed even that would converge in about 5 iterations.
So using 1000 instead of 100 now.

Remove hard coded schema name in install check.

This commit includes changes for performance improvement too.
- It distributes intermediate and temp edge table by
both grouping cols and the dest vertex, hoping it would perform
better with MPP databases such as Greenplum and HAWQ.
- The query that inserts random_prob for nodes that have no
incoming edges was inefficient, especially with grouping.
Updated that query which significantly reduces the time with
grouping. That query is now run outside the for loop since the
graph will not change.

Closes #137


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/186aefa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/186aefa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/186aefa6

Branch: refs/heads/master
Commit: 186aefa6dabbf58ed46a125b876cca4e5b5e2422
Parents: 755c7b7
Author: Nandish Jayaram <njaya...@apache.org>
Authored: Fri May 19 11:00:03 2017 -0700
Committer: Nandish Jayaram <njaya...@apache.org>
Committed: Tue May 30 09:36:40 2017 -0700

----------------------------------------------------------------------
 doc/design/modules/graph.tex                    |  4 +-
 src/ports/postgres/modules/graph/pagerank.py_in | 93 ++++++++++++++------
 .../postgres/modules/graph/pagerank.sql_in      |  8 +-
 .../postgres/modules/graph/test/pagerank.sql_in |  4 +-
 4 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/doc/design/modules/graph.tex
----------------------------------------------------------------------
diff --git a/doc/design/modules/graph.tex b/doc/design/modules/graph.tex
index 1d0233c..ec68743 100644
--- a/doc/design/modules/graph.tex
+++ b/doc/design/modules/graph.tex
@@ -399,7 +399,7 @@ WHERE ABS(previous.pagerank - cur.pagerank) > threshold
 The pagerank module in MADlib has a few optional parameters: damping factor
 $d$, number of iterations $max$, and the threshold for convergence $threshold$.
 The default values for these parameters when not specified by the user are
-$0.85$, $100$ and $\frac{1}{N*100}$ respectively.
+$0.85$, $100$ and $\frac{1}{N*1000}$ respectively.
 
 The damping factor denotes the probability with which the surfer uses the edges
 to traverse the graph. If set to $0$, it implies that the only way a surfer
@@ -412,7 +412,7 @@ The convergence test for PageRank in MADlib checks for the 
delta between
 the PageRank scores of a vertex across two consecutive iterations. Since
 the initial value of the PageRank score is set to $\frac{1}{N}$, the delta
 will be small in the initial iterations when $N$ is large (say over 100
-million). We thus set the threshold to $\frac{1}{N*100}$, and it is to be
+million). We thus set the threshold to $\frac{1}{N*1000}$, and it is to be
 noted that this is not based on any experimental study. Users of MADlib are
 encouraged to consider this factor when setting a value for threshold, since
 a high $threshold$ value would lead to early termination of PageRank

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/src/ports/postgres/modules/graph/pagerank.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/pagerank.py_in 
b/src/ports/postgres/modules/graph/pagerank.py_in
index 202e536..4ef5876 100644
--- a/src/ports/postgres/modules/graph/pagerank.py_in
+++ b/src/ports/postgres/modules/graph/pagerank.py_in
@@ -116,7 +116,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
     # value based on number of nodes in the graph.
     # NOTE: The heuristic below is not based on any scientific evidence.
     if threshold is None:
-        threshold = 1.0/(nvertices*100)
+        threshold = 1.0/(nvertices*1000)
 
     # table/column names used when grouping_cols is set.
     distinct_grp_table = ''
@@ -128,7 +128,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
 
     edge_temp_table = unique_string(desp='temp_edge')
     distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0})".format(dest)!>)
+        <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
+            if grouping_cols else '', dest)!>)
     plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
     plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS
         SELECT * FROM {edge_table}
@@ -151,8 +152,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
     out_cnts_cnt = unique_string(desp='cnt')
     v1 = unique_string(desp='v1')
 
+    cur_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
+        <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
+            if grouping_cols else '', vertex_id)!>)
     cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-            <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+            <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
+            if grouping_cols else '', vertex_id)!>)
     cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id}
         """.format(**locals())
     out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src}
@@ -178,6 +183,9 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
     message_grp_where_ins = ''
     ignore_group_clause_ins = ''
 
+    nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
+    ignore_group_clause_ins_noincoming = ''
+
     grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
     group_by_grouping_cols_conv = ''
     message_grp_clause_conv = ''
@@ -192,7 +200,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
         distinct_grp_table = unique_string(desp='grp')
         plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
         plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
-                SELECT DISTINCT {grouping_cols} FROM {edge_table}
+                SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
             """.format(**locals()))
         vertices_per_group = unique_string(desp='nvert_grp')
         init_pr = unique_string(desp='init')
@@ -213,9 +221,9 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                 {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS 
{random_prob}
                 FROM {distinct_grp_table} INNER JOIN (
                     SELECT {grouping_cols}, {src} AS __vertices__
-                    FROM {edge_table}
+                    FROM {edge_temp_table}
                     UNION
-                    SELECT {grouping_cols}, {dest} FROM {edge_table}
+                    SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
                 ){subq}
                 ON {grouping_where_clause}
                 GROUP BY {group_by_clause}
@@ -232,11 +240,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                        {init_pr} AS pagerank
                 FROM {vertices_per_group} INNER JOIN (
                     SELECT {grouping_cols}, {src} AS __vertices__
-                    FROM {edge_table}
+                    FROM {edge_temp_table}
                     UNION
-                    SELECT {grouping_cols}, {dest} FROM {edge_table}
+                    SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
                 ){subq}
                 ON {grouping_where_clause}
+                {cur_distribution}
             """.format(**locals()))
         vpg = unique_string(desp='vpg')
         # Compute the out-degree of every node in the group-based subgraphs.
@@ -244,7 +253,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
         plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
             SELECT {grouping_cols_select} {src} AS {vertex_id},
                    COUNT({dest}) AS {out_cnts_cnt}
-            FROM {edge_table}
+            FROM {edge_temp_table}
             GROUP BY {grouping_cols_select} {src}
             {cnts_distribution}
             """.format(grouping_cols_select=grouping_cols+','
@@ -265,8 +274,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
         vpg_join_clause = ' AND '.join(
             ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
                 for col in grouping_cols_list])
-        vpg_cur_join_clause = ' AND '.join(
-            ["{cur}.{col}={vpg}.{col}".format(**locals())
+        vpg_t1_join_clause = ' AND '.join(
+            ["__t1__.{col}={vpg}.{col}".format(**locals())
                 for col in grouping_cols_list])
         # join clause specific to populating random_prob for nodes without any
         # incoming edges.
@@ -311,12 +320,14 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             AS {vpg} ON {vpg_join_clause}""".format(**locals())
         ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table,
             edge_temp_table, grouping_cols_list)
+        ignore_group_clause_ins_noincoming = ' WHERE '+get_ignore_groups(
+            summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
         # Strings required for updating PageRank scores of vertices that have
         # no incoming edges
         grouping_cols_select_ins = cur_grouping_cols_select+','
         vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
             **locals())
-        vpg_where_clause_ins = '{vpg_cur_join_clause} AND '.format(
+        vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
             **locals())
         message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
         ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table,
@@ -329,6 +340,27 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
         ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table,
             cur, grouping_cols_list)
         limit = ''
+
+        # Find all nodes, in each group, that have no incoming edges. The 
PageRank
+        # value of these nodes are not updated using the first query in the
+        # following for loop. They must be explicitly plugged back in to the
+        # message table, with their corresponding group's random_prob as their
+        # PageRank values.
+        plpy.execute("""
+                CREATE TABLE {nodes_with_no_incoming_edges} AS
+                SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
+                        {vpg}.{random_prob} AS pagerank
+                FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
+                WHERE NOT EXISTS (
+                    SELECT 1
+                    FROM {edge_temp_table} AS __t2__
+                    WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
+                ) {vpg_where_clause_ins}
+            """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
+                        for col in grouping_cols_list]),
+                    where_group_clause=' AND 
'.join(['__t1__.{0}=__t2__.{0}'.format(col)
+                        for col in grouping_cols_list]),
+                    **locals()))
     else:
         # cur and out_cnts tables can be simpler when no grouping is involved.
         init_value = 1.0/nvertices
@@ -336,17 +368,17 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                 CREATE TEMP TABLE {cur} AS
                 SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
                 FROM {vertex_table}
+                {cur_distribution}
             """.format(**locals()))
 
         # Compute the out-degree of every node in the graph.
         plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
         plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
             SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
-            FROM {edge_table}
+            FROM {edge_temp_table}
             GROUP BY {src}
             {cnts_distribution}
             """.format(**locals()))
-
         # The summary table when there is no grouping will contain only
         # the iteration column. We don't need to create the out_table
         # when no grouping is used since the 'cur' table will be renamed
@@ -356,6 +388,16 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                     __iterations__ INTEGER
                 )
             """.format(**locals()))
+        # Find all nodes in the graph that don't have any incoming edges and
+        # assign random_prob as their pagerank values.
+        plpy.execute("""
+                CREATE TABLE {nodes_with_no_incoming_edges} AS
+                SELECT DISTINCT({src}), {random_probability} AS pagerank
+                FROM {edge_temp_table}
+                EXCEPT
+                    (SELECT DISTINCT({dest}), {random_probability} AS pagerank
+                    FROM {edge_temp_table})
+            """.format(**locals()))
     unconverged = 0
     iteration_num = 0
     for iteration_num in range(max_iter):
@@ -389,6 +431,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                     {vertices_per_group_inner_join_pr}
                 {ignore_group_clause}
                 GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
+                {cur_distribution}
             """.format(ignore_group_clause=ignore_group_clause_pr
                     if iteration_num>0 else ignore_group_clause_first,
                 **locals()))
@@ -397,20 +440,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
         # with random_prob.
         plpy.execute("""
                 INSERT INTO {message}
-                SELECT {grouping_cols_select_ins} {cur}.{vertex_id},
-                    {random_jump_prob} AS pagerank
-                FROM {cur} {vpg_from_clause_ins}
-                WHERE {vpg_where_clause_ins} {vertex_id} NOT IN (
-                    SELECT {vertex_id}
-                    FROM {message}
-                    {message_grp_where_ins}
-                )
+                SELECT *
+                FROM {nodes_with_no_incoming_edges}
                 {ignore_group_clause}
-                GROUP BY {grouping_cols_select_ins} {cur}.{vertex_id}
-            """.format(ignore_group_clause=ignore_group_clause_ins
+            """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
                     if iteration_num>0 else ignore_group_clause_first,
-                **locals()))
-
+                    **locals()))
         # Check for convergence:
         ## Check for convergence only if threshold != 0.
         if threshold != 0:
@@ -482,9 +517,9 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             """.format(**locals()))
 
     ## Step 4: Cleanup
-    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5};
+    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6};
         """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
-                    message_unconv))
+                    message_unconv, nodes_with_no_incoming_edges))
     if grouping_cols:
         plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
             """.format(vertices_per_group, temp_summary_table,
@@ -570,7 +605,7 @@ def pagerank_help(schema_madlib, message, **kwargs):
     damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
                                      -- (DEFAULT = 0.85)
     max_iter      INTEGER, -- Maximum iteration number (DEFAULT = 100)
-    threshold     DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*100),
+    threshold     DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000),
                                     -- N is number of vertices in the graph)
     grouping_col  TEXT -- Comma separated column names to group on
                        -- (DEFAULT = NULL, no grouping)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/src/ports/postgres/modules/graph/pagerank.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/pagerank.sql_in 
b/src/ports/postgres/modules/graph/pagerank.sql_in
index 6531bb5..e028f92 100644
--- a/src/ports/postgres/modules/graph/pagerank.sql_in
+++ b/src/ports/postgres/modules/graph/pagerank.sql_in
@@ -107,7 +107,7 @@ parameter.
 <dd>INTEGER, default: 100. The maximum number of iterations allowed.</dd>
 
 <dt>threshold</dt>
-<dd>FLOAT8, default: (1/number of vertices * 100). If the difference between 
the PageRank of every vertex of two consecutive
+<dd>FLOAT8, default: (1/number of vertices * 1000). If the difference between 
the PageRank of every vertex of two consecutive
 iterations is smaller than 'threshold', or the iteration number is larger than 
'max_iter', the
 computation stops.  If you set the threshold to zero, then you will force the 
algorithm to run for the full number of iterations specified in 'max_iter'.
 It is advisable to set threshold to a value lower than 1/(number of vertices 
in the graph) since the PageRank value of nodes is initialized to that
@@ -322,7 +322,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.pagerank(
     damping_factor  FLOAT8,
     max_iter        INTEGER
 ) RETURNS VOID AS $$
-    SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, $7, 0.00001, NULL)
+    SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, $7, NULL, NULL)
 $$ LANGUAGE SQL
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 -------------------------------------------------------------------------
@@ -334,7 +334,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.pagerank(
     out_table       TEXT,
     damping_factor  FLOAT8
 ) RETURNS VOID AS $$
-    SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, 100, 0.00001, NULL)
+    SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, 100, NULL, NULL)
 $$ LANGUAGE SQL
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 -------------------------------------------------------------------------
@@ -345,7 +345,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.pagerank(
     edge_args       TEXT,
     out_table       TEXT
 ) RETURNS VOID AS $$
-    SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, 0.85, 100, 0.00001, NULL)
+    SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, 0.85, 100, NULL, NULL)
 $$ LANGUAGE SQL
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 -------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/src/ports/postgres/modules/graph/test/pagerank.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/test/pagerank.sql_in 
b/src/ports/postgres/modules/graph/test/pagerank.sql_in
index 3ccfdd1..d3bc32b 100644
--- a/src/ports/postgres/modules/graph/test/pagerank.sql_in
+++ b/src/ports/postgres/modules/graph/test/pagerank.sql_in
@@ -61,7 +61,7 @@ INSERT INTO edge VALUES
 (6, 3, 2);
 
 DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary;
-SELECT madlib.pagerank(
+SELECT pagerank(
              'vertex',        -- Vertex table
              'id',            -- Vertix id column
              'edge',          -- Edge table
@@ -75,7 +75,7 @@ SELECT assert(relative_error(SUM(pagerank), 1) < 0.00001,
 
 DROP TABLE IF EXISTS pagerank_gr_out;
 DROP TABLE IF EXISTS pagerank_gr_out_summary;
-SELECT madlib.pagerank(
+SELECT pagerank(
              'vertex',        -- Vertex table
              'id',            -- Vertix id column
              'edge',          -- Edge table

Reply via email to