[
https://issues.apache.org/jira/browse/MADLIB-1082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15968031#comment-15968031
]
ASF GitHub Bot commented on MADLIB-1082:
----------------------------------------
Github user njayaram2 commented on a diff in the pull request:
https://github.com/apache/incubator-madlib/pull/112#discussion_r111456870
--- Diff: src/ports/postgres/modules/graph/pagerank.py_in ---
@@ -158,44 +313,198 @@ def pagerank(schema_madlib, vertex_table, vertex_id,
edge_table, edge_args,
# https://en.wikipedia.org/wiki/PageRank#Damping_factor
# The query below computes the PageRank of each node using the
above formula.
+ # A small explanatory note on ignore_group_clause:
+ # This is used only when grouping is set. This essentially will
have
+ # the condition that will help skip the PageRank computation on
groups
+ # that have converged.
plpy.execute("""
CREATE TABLE {message} AS
- SELECT {edge_temp_table}.{dest} AS {vertex_id},
-
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_prob} AS
pagerank
+ SELECT {grouping_cols_select} {edge_temp_table}.{dest} AS
{vertex_id},
+
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
AS pagerank
FROM {edge_temp_table}
- INNER JOIN {cur} ON
{edge_temp_table}.{dest}={cur}.{vertex_id}
- INNER JOIN {out_cnts} ON
{out_cnts}.{vertex_id}={edge_temp_table}.{src}
- INNER JOIN {cur} AS {v1} ON
{v1}.{vertex_id}={edge_temp_table}.{src}
- GROUP BY {edge_temp_table}.{dest}
- """.format(**locals()))
+ INNER JOIN {cur} ON {cur_join_clause}
+ INNER JOIN {out_cnts} ON {out_cnts_join_clause}
+ INNER JOIN {cur} AS {v1} ON {v1_join_clause}
+ {vertices_per_group_inner_join}
+ {ignore_group_clause}
+ GROUP BY {grouping_cols_select} {edge_temp_table}.{dest}
+ """.format(grouping_cols_select=edge_grouping_cols_select+', '
+ if grouping_cols else '',
+
random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
+ if grouping_cols else random_probability,
+ vertices_per_group_inner_join="""INNER JOIN
{vertices_per_group}
+ AS {vpg} ON {vpg_join_clause}""".format(**locals())
+ if grouping_cols else '',
+ ignore_group_clause=' WHERE '+get_ignore_groups(
+ summary_table, edge_temp_table, grouping_cols_list)
+ if iteration_num>0 and grouping_cols else '',
+ **locals()))
# If there are nodes that have no incoming edges, they are not
captured in the message table.
# Insert entries for such nodes, with random_prob.
plpy.execute("""
INSERT INTO {message}
- SELECT {vertex_id}, {random_prob}::DOUBLE PRECISION AS
pagerank
- FROM {cur}
- WHERE {vertex_id} NOT IN (
+ SELECT {grouping_cols_select} {cur}.{vertex_id},
{random_jump_prob} AS pagerank
+ FROM {cur} {vpg_from_clause}
+ WHERE {vpg_where_clause} {vertex_id} NOT IN (
SELECT {vertex_id}
FROM {message}
+ {message_grp_where}
)
- """.format(**locals()))
- # Check for convergence will be done as part of grouping support
for pagerank:
- # https://issues.apache.org/jira/browse/MADLIB-1082. So, the
threshold parameter
- # is a dummy variable at the moment, the PageRank computation
happens for
- # {max_iter} number of times.
+ {ignore_group_clause}
+ GROUP BY {grouping_cols_select} {cur}.{vertex_id}
+ """.format(grouping_cols_select=cur_grouping_cols_select+','
+ if grouping_cols else '',
+ vpg_from_clause=', {vertices_per_group} AS
{vpg}'.format(**locals())
+ if grouping_cols else '',
+ vpg_where_clause='{vpg_cur_join_clause} AND
'.format(**locals())
+ if grouping_cols else '',
+ message_grp_where='WHERE {message_grp}'.format(**locals())
+ if grouping_cols else '',
+
random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
+ if grouping_cols else random_probability,
+ ignore_group_clause=' AND '+get_ignore_groups(
+ summary_table, cur, grouping_cols_list)
+ if iteration_num>0 and grouping_cols else '',
+ **locals()))
+
+ # Check for convergence:
+ ## Check for convergence only if threshold != 0.
+ if threshold != 0:
+ # message_unconv and cur_unconv will contain the unconverged
groups
+ # after current # and previous iterations respectively. Groups
that
+ # are missing in message_unconv but appear in cur_unconv are
the
+ # groups that have converged after this iteration's
computations.
+ # If no grouping columns are specified, then we check if there
is
+ # at least one unconverged node (limit 1 is used in the query).
+ plpy.execute("""
+ CREATE TEMP TABLE {message_unconv} AS
+ SELECT {grouping_cols_select}
+ FROM {message}
+ INNER JOIN {cur}
+ ON {cur}.{vertex_id}={message}.{vertex_id}
+ WHERE {message_grp_clause}
+ ABS({cur}.pagerank-{message}.pagerank) >
{threshold}
+ {ignore_group_clause}
+ {group_by_grouping_cols}
+ {limit}
+ """.format(grouping_cols_select=cur_grouping_cols_select
+ if grouping_cols else '{0}.{1}'.format(cur,
vertex_id),
+ group_by_grouping_cols=' GROUP BY
{0}'.format(cur_grouping_cols_select)
+ if grouping_cols else '',
+ message_grp_clause='{0} AND '.format(message_grp)
+ if grouping_cols else '',
+ ignore_group_clause=' AND
'+get_ignore_groups(summary_table, cur,
+ grouping_cols_list) if iteration_num>0 and
grouping_cols else '',
+ limit='' if grouping_cols else ' LIMIT 1 ',
+ **locals()))
+ unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
+ """.format(message_unconv))[0]["cnt"]
+ if iteration_num > 0 and grouping_cols:
+ # Update result and summary tables for groups that have
converged
+ # since the last iteration.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, message, grouping_cols_list,
+ cur_unconv, message_unconv)
+ plpy.execute("""
+ DROP TABLE IF EXISTS {cur_unconv};
+ ALTER TABLE {message_unconv} RENAME TO {cur_unconv}
+ """.format(**locals()))
+ else:
+ # Do not run convergence test if threshold=0, since that
implies
+ # the user wants to run max_iter iterations.
+ unconverged = 1
plpy.execute("""
- DROP TABLE IF EXISTS {cur};
- ALTER TABLE {message} RENAME TO {cur}
- """.format(**locals()))
+ DROP TABLE IF EXISTS {cur};
+ ALTER TABLE {message} RENAME TO {cur}
+ """.format(**locals()))
+ if unconverged == 0:
+ break
- plpy.execute("ALTER TABLE {cur} RENAME TO
{out_table}".format(**locals()))
+ # If there still are some unconverged groups/(entire table), update
results.
+ if grouping_cols:
+ if unconverged > 0:
+ if threshold != 0:
+ # We completed max_iters, but there are still some
unconverged groups
+ # Update the result and summary tables for unconverged
groups.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, cur, grouping_cols_list,
cur_unconv)
+ else:
+ # No group has converged. List of all group values are in
+ # distinct_grp_table.
+ update_result_tables(temp_summary_table, iteration_num,
+ summary_table, out_table, cur, grouping_cols_list,
distinct_grp_table)
+ else:
+ plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
+ """.format(table_name=cur, **locals()))
+ plpy.execute("""
+ INSERT INTO {summary_table} VALUES
+ ({iteration_num}+1);
+ """.format(**locals()))
## Step 4: Cleanup
- plpy.execute("""
- DROP TABLE IF EXISTS {0},{1},{2},{3};
- """.format(out_cnts, edge_temp_table, cur, message))
+ plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5};
+ """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
+ message_unconv))
+ if grouping_cols:
+ plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
+ """.format(vertices_per_group, temp_summary_table,
+ distinct_grp_table))
plpy.execute("SET client_min_messages TO %s" % old_msg_level)
+def update_result_tables(temp_summary_table, i, summary_table, out_table,
--- End diff --
Using truncate instead of drop and create.
> Graph - add grouping to page rank
> ---------------------------------
>
> Key: MADLIB-1082
> URL: https://issues.apache.org/jira/browse/MADLIB-1082
> Project: Apache MADlib
> Issue Type: Improvement
> Components: Module: Graph
> Reporter: Frank McQuillan
> Assignee: Nandish Jayaram
> Priority: Minor
> Fix For: v1.11
>
>
> Add grouping column to edge table to support separate page rank calculations
> by group
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)