Repository: incubator-madlib Updated Branches: refs/heads/master 755c7b70d -> 186aefa6d
Bugfix: Fix multiple bugs and perf issue in grouping JIRA: MADLIB-1100, MADLIB-1107 The default threshold valud is set to 1e-5 instead of a value dependent on the number of vertices (1/num_vertices*1000). The older docs said it would be (1/number_vertices*100), but latest experiments showed even that would converge in about 5 iterations. So using 1000 instead of 100 now. Remove hard coded schema name in install check. This commit includes changes for performance improvement too. - It distributes intermediate and temp edge table by both grouping cols and the dest vertex, hoping it would perform better with MPP databases such as Greenplum and HAWQ. - The query that inserts random_prob for nodes that have no incoming edges was inefficient, especially with grouping. Updated that query which significantly reduces the time with grouping. That query is now run outside the for loop since the graph will not change. Closes #137 Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/186aefa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/186aefa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/186aefa6 Branch: refs/heads/master Commit: 186aefa6dabbf58ed46a125b876cca4e5b5e2422 Parents: 755c7b7 Author: Nandish Jayaram <njaya...@apache.org> Authored: Fri May 19 11:00:03 2017 -0700 Committer: Nandish Jayaram <njaya...@apache.org> Committed: Tue May 30 09:36:40 2017 -0700 ---------------------------------------------------------------------- doc/design/modules/graph.tex | 4 +- src/ports/postgres/modules/graph/pagerank.py_in | 93 ++++++++++++++------ .../postgres/modules/graph/pagerank.sql_in | 8 +- .../postgres/modules/graph/test/pagerank.sql_in | 4 +- 4 files changed, 72 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/doc/design/modules/graph.tex ---------------------------------------------------------------------- diff --git a/doc/design/modules/graph.tex b/doc/design/modules/graph.tex index 1d0233c..ec68743 100644 --- a/doc/design/modules/graph.tex +++ b/doc/design/modules/graph.tex @@ -399,7 +399,7 @@ WHERE ABS(previous.pagerank - cur.pagerank) > threshold The pagerank module in MADlib has a few optional parameters: damping factor $d$, number of iterations $max$, and the threshold for convergence $threshold$. The default values for these parameters when not specified by the user are -$0.85$, $100$ and $\frac{1}{N*100}$ respectively. +$0.85$, $100$ and $\frac{1}{N*1000}$ respectively. The damping factor denotes the probability with which the surfer uses the edges to traverse the graph. If set to $0$, it implies that the only way a surfer @@ -412,7 +412,7 @@ The convergence test for PageRank in MADlib checks for the delta between the PageRank scores of a vertex across two consecutive iterations. Since the initial value of the PageRank score is set to $\frac{1}{N}$, the delta will be small in the initial iterations when $N$ is large (say over 100 -million). We thus set the threshold to $\frac{1}{N*100}$, and it is to be +million). We thus set the threshold to $\frac{1}{N*1000}$, and it is to be noted that this is not based on any experimental study. Users of MADlib are encouraged to consider this factor when setting a value for threshold, since a high $threshold$ value would lead to early termination of PageRank http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/src/ports/postgres/modules/graph/pagerank.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/pagerank.py_in b/src/ports/postgres/modules/graph/pagerank.py_in index 202e536..4ef5876 100644 --- a/src/ports/postgres/modules/graph/pagerank.py_in +++ b/src/ports/postgres/modules/graph/pagerank.py_in @@ -116,7 +116,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, # value based on number of nodes in the graph. # NOTE: The heuristic below is not based on any scientific evidence. if threshold is None: - threshold = 1.0/(nvertices*100) + threshold = 1.0/(nvertices*1000) # table/column names used when grouping_cols is set. distinct_grp_table = '' @@ -128,7 +128,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, edge_temp_table = unique_string(desp='temp_edge') distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY ({0})".format(dest)!>) + <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+',' + if grouping_cols else '', dest)!>) plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table)) plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS SELECT * FROM {edge_table} @@ -151,8 +152,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_cnts_cnt = unique_string(desp='cnt') v1 = unique_string(desp='v1') + cur_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, + <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+',' + if grouping_cols else '', vertex_id)!>) cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) + <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+',' + if grouping_cols else '', vertex_id)!>) cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id} """.format(**locals()) out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src} @@ -178,6 +183,9 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, message_grp_where_ins = '' ignore_group_clause_ins = '' + nodes_with_no_incoming_edges = unique_string(desp='no_incoming') + ignore_group_clause_ins_noincoming = '' + grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id) group_by_grouping_cols_conv = '' message_grp_clause_conv = '' @@ -192,7 +200,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, distinct_grp_table = unique_string(desp='grp') plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS - SELECT DISTINCT {grouping_cols} FROM {edge_table} + SELECT DISTINCT {grouping_cols} FROM {edge_temp_table} """.format(**locals())) vertices_per_group = unique_string(desp='nvert_grp') init_pr = unique_string(desp='init') @@ -213,9 +221,9 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS {random_prob} FROM {distinct_grp_table} INNER JOIN ( SELECT {grouping_cols}, {src} AS __vertices__ - FROM {edge_table} + FROM {edge_temp_table} UNION - SELECT {grouping_cols}, {dest} FROM {edge_table} + SELECT {grouping_cols}, {dest} FROM {edge_temp_table} ){subq} ON {grouping_where_clause} GROUP BY {group_by_clause} @@ -232,11 +240,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, {init_pr} AS pagerank FROM {vertices_per_group} INNER JOIN ( SELECT {grouping_cols}, {src} AS __vertices__ - FROM {edge_table} + FROM {edge_temp_table} UNION - SELECT {grouping_cols}, {dest} FROM {edge_table} + SELECT {grouping_cols}, {dest} FROM {edge_temp_table} ){subq} ON {grouping_where_clause} + {cur_distribution} """.format(**locals())) vpg = unique_string(desp='vpg') # Compute the out-degree of every node in the group-based subgraphs. @@ -244,7 +253,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, plpy.execute("""CREATE TEMP TABLE {out_cnts} AS SELECT {grouping_cols_select} {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt} - FROM {edge_table} + FROM {edge_temp_table} GROUP BY {grouping_cols_select} {src} {cnts_distribution} """.format(grouping_cols_select=grouping_cols+',' @@ -265,8 +274,8 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, vpg_join_clause = ' AND '.join( ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals()) for col in grouping_cols_list]) - vpg_cur_join_clause = ' AND '.join( - ["{cur}.{col}={vpg}.{col}".format(**locals()) + vpg_t1_join_clause = ' AND '.join( + ["__t1__.{col}={vpg}.{col}".format(**locals()) for col in grouping_cols_list]) # join clause specific to populating random_prob for nodes without any # incoming edges. @@ -311,12 +320,14 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, AS {vpg} ON {vpg_join_clause}""".format(**locals()) ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table, edge_temp_table, grouping_cols_list) + ignore_group_clause_ins_noincoming = ' WHERE '+get_ignore_groups( + summary_table, nodes_with_no_incoming_edges, grouping_cols_list) # Strings required for updating PageRank scores of vertices that have # no incoming edges grouping_cols_select_ins = cur_grouping_cols_select+',' vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format( **locals()) - vpg_where_clause_ins = '{vpg_cur_join_clause} AND '.format( + vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format( **locals()) message_grp_where_ins = 'WHERE {message_grp}'.format(**locals()) ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table, @@ -329,6 +340,27 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table, cur, grouping_cols_list) limit = '' + + # Find all nodes, in each group, that have no incoming edges. The PageRank + # value of these nodes are not updated using the first query in the + # following for loop. They must be explicitly plugged back in to the + # message table, with their corresponding group's random_prob as their + # PageRank values. + plpy.execute(""" + CREATE TABLE {nodes_with_no_incoming_edges} AS + SELECT {select_group_cols}, __t1__.{src} AS {vertex_id}, + {vpg}.{random_prob} AS pagerank + FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins} + WHERE NOT EXISTS ( + SELECT 1 + FROM {edge_temp_table} AS __t2__ + WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause} + ) {vpg_where_clause_ins} + """.format(select_group_cols=','.join(['__t1__.{0}'.format(col) + for col in grouping_cols_list]), + where_group_clause=' AND '.join(['__t1__.{0}=__t2__.{0}'.format(col) + for col in grouping_cols_list]), + **locals())) else: # cur and out_cnts tables can be simpler when no grouping is involved. init_value = 1.0/nvertices @@ -336,17 +368,17 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, CREATE TEMP TABLE {cur} AS SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank FROM {vertex_table} + {cur_distribution} """.format(**locals())) # Compute the out-degree of every node in the graph. plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) plpy.execute("""CREATE TEMP TABLE {out_cnts} AS SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt} - FROM {edge_table} + FROM {edge_temp_table} GROUP BY {src} {cnts_distribution} """.format(**locals())) - # The summary table when there is no grouping will contain only # the iteration column. We don't need to create the out_table # when no grouping is used since the 'cur' table will be renamed @@ -356,6 +388,16 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, __iterations__ INTEGER ) """.format(**locals())) + # Find all nodes in the graph that don't have any incoming edges and + # assign random_prob as their pagerank values. + plpy.execute(""" + CREATE TABLE {nodes_with_no_incoming_edges} AS + SELECT DISTINCT({src}), {random_probability} AS pagerank + FROM {edge_temp_table} + EXCEPT + (SELECT DISTINCT({dest}), {random_probability} AS pagerank + FROM {edge_temp_table}) + """.format(**locals())) unconverged = 0 iteration_num = 0 for iteration_num in range(max_iter): @@ -389,6 +431,7 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, {vertices_per_group_inner_join_pr} {ignore_group_clause} GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest} + {cur_distribution} """.format(ignore_group_clause=ignore_group_clause_pr if iteration_num>0 else ignore_group_clause_first, **locals())) @@ -397,20 +440,12 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, # with random_prob. plpy.execute(""" INSERT INTO {message} - SELECT {grouping_cols_select_ins} {cur}.{vertex_id}, - {random_jump_prob} AS pagerank - FROM {cur} {vpg_from_clause_ins} - WHERE {vpg_where_clause_ins} {vertex_id} NOT IN ( - SELECT {vertex_id} - FROM {message} - {message_grp_where_ins} - ) + SELECT * + FROM {nodes_with_no_incoming_edges} {ignore_group_clause} - GROUP BY {grouping_cols_select_ins} {cur}.{vertex_id} - """.format(ignore_group_clause=ignore_group_clause_ins + """.format(ignore_group_clause=ignore_group_clause_ins_noincoming if iteration_num>0 else ignore_group_clause_first, - **locals())) - + **locals())) # Check for convergence: ## Check for convergence only if threshold != 0. if threshold != 0: @@ -482,9 +517,9 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, """.format(**locals())) ## Step 4: Cleanup - plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5}; + plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}; """.format(out_cnts, edge_temp_table, cur, message, cur_unconv, - message_unconv)) + message_unconv, nodes_with_no_incoming_edges)) if grouping_cols: plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}; """.format(vertices_per_group, temp_summary_table, @@ -570,7 +605,7 @@ def pagerank_help(schema_madlib, message, **kwargs): damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model -- (DEFAULT = 0.85) max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100) - threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*100), + threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000), -- N is number of vertices in the graph) grouping_col TEXT -- Comma separated column names to group on -- (DEFAULT = NULL, no grouping) http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/src/ports/postgres/modules/graph/pagerank.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/pagerank.sql_in b/src/ports/postgres/modules/graph/pagerank.sql_in index 6531bb5..e028f92 100644 --- a/src/ports/postgres/modules/graph/pagerank.sql_in +++ b/src/ports/postgres/modules/graph/pagerank.sql_in @@ -107,7 +107,7 @@ parameter. <dd>INTEGER, default: 100. The maximum number of iterations allowed.</dd> <dt>threshold</dt> -<dd>FLOAT8, default: (1/number of vertices * 100). If the difference between the PageRank of every vertex of two consecutive +<dd>FLOAT8, default: (1/number of vertices * 1000). If the difference between the PageRank of every vertex of two consecutive iterations is smaller than 'threshold', or the iteration number is larger than 'max_iter', the computation stops. If you set the threshold to zero, then you will force the algorithm to run for the full number of iterations specified in 'max_iter'. It is advisable to set threshold to a value lower than 1/(number of vertices in the graph) since the PageRank value of nodes is initialized to that @@ -322,7 +322,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.pagerank( damping_factor FLOAT8, max_iter INTEGER ) RETURNS VOID AS $$ - SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, $7, 0.00001, NULL) + SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, $7, NULL, NULL) $$ LANGUAGE SQL m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); ------------------------------------------------------------------------- @@ -334,7 +334,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.pagerank( out_table TEXT, damping_factor FLOAT8 ) RETURNS VOID AS $$ - SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, 100, 0.00001, NULL) + SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, $6, 100, NULL, NULL) $$ LANGUAGE SQL m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); ------------------------------------------------------------------------- @@ -345,7 +345,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.pagerank( edge_args TEXT, out_table TEXT ) RETURNS VOID AS $$ - SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, 0.85, 100, 0.00001, NULL) + SELECT MADLIB_SCHEMA.pagerank($1, $2, $3, $4, $5, 0.85, 100, NULL, NULL) $$ LANGUAGE SQL m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); ------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/186aefa6/src/ports/postgres/modules/graph/test/pagerank.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/test/pagerank.sql_in b/src/ports/postgres/modules/graph/test/pagerank.sql_in index 3ccfdd1..d3bc32b 100644 --- a/src/ports/postgres/modules/graph/test/pagerank.sql_in +++ b/src/ports/postgres/modules/graph/test/pagerank.sql_in @@ -61,7 +61,7 @@ INSERT INTO edge VALUES (6, 3, 2); DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary; -SELECT madlib.pagerank( +SELECT pagerank( 'vertex', -- Vertex table 'id', -- Vertix id column 'edge', -- Edge table @@ -75,7 +75,7 @@ SELECT assert(relative_error(SUM(pagerank), 1) < 0.00001, DROP TABLE IF EXISTS pagerank_gr_out; DROP TABLE IF EXISTS pagerank_gr_out_summary; -SELECT madlib.pagerank( +SELECT pagerank( 'vertex', -- Vertex table 'id', -- Vertix id column 'edge', -- Edge table