http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/pagerank.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/pagerank.py_in 
b/src/ports/postgres/modules/graph/pagerank.py_in
index 4ef5876..95bc4b4 100644
--- a/src/ports/postgres/modules/graph/pagerank.py_in
+++ b/src/ports/postgres/modules/graph/pagerank.py_in
@@ -32,36 +32,38 @@ from utilities.control import MinWarning
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string, split_quoted_delimited_str
+from utilities.utilities import is_platform_pg
+
 from utilities.validate_args import columns_exist_in_table, get_cols_and_types
 from graph_utils import *
 
-m4_changequote(`<!', `!>')
 
 def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, damping_factor, max_iter, threshold,
-        grouping_cols_list, module_name):
+                           edge_params, out_table, damping_factor, max_iter,
+                           threshold, grouping_cols_list):
     """
     Function to validate input parameters for PageRank
     """
     validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-        out_table, module_name)
+                          out_table, 'PageRank')
     _assert(damping_factor >= 0.0 and damping_factor <= 1.0,
-        """PageRank: Invalid damping factor value ({0}), must be between 0 and 
1.""".
-        format(damping_factor))
+            "PageRank: Invalid damping factor value ({0}), must be between 0 
and 1.".
+            format(damping_factor))
     _assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
-        """PageRank: Invalid threshold value ({0}), must be between 0 and 
1.""".
-        format(threshold))
+            "PageRank: Invalid threshold value ({0}), must be between 0 and 
1.".
+            format(threshold))
     _assert(max_iter > 0,
-        """PageRank: Invalid max_iter value ({0}), must be a positive 
integer.""".
-        format(max_iter))
+            """PageRank: Invalid max_iter value ({0}), must be a positive 
integer.""".
+            format(max_iter))
     if grouping_cols_list:
         # validate the grouping columns. We currently only support 
grouping_cols
         # to be column names in the edge_table, and not expressions!
         _assert(columns_exist_in_table(edge_table, grouping_cols_list, 
schema_madlib),
                 "PageRank error: One or more grouping columns specified do not 
exist!")
 
+
 def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
-    out_table, damping_factor, max_iter, threshold, grouping_cols, **kwargs):
+             out_table, damping_factor, max_iter, threshold, grouping_cols, 
**kwargs):
     """
     Function that computes the PageRank
 
@@ -76,458 +78,451 @@ def pagerank(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
         @param max_iter
         @param threshold
     """
-    old_msg_level = plpy.execute("""
-                                  SELECT setting
-                                  FROM pg_settings
-                                  WHERE name='client_min_messages'
-                                  """)[0]['setting']
-    plpy.execute('SET client_min_messages TO warning')
-    params_types = {'src': str, 'dest': str}
-    default_args = {'src': 'src', 'dest': 'dest'}
-    edge_params = extract_keyvalue_params(edge_args, params_types, 
default_args)
-
-    # populate default values for optional params if null
-    if damping_factor is None:
-        damping_factor = 0.85
-    if max_iter is None:
-        max_iter = 100
-    if vertex_id is None:
-        vertex_id = "id"
-    if not grouping_cols:
-        grouping_cols = ''
-
-    grouping_cols_list = split_quoted_delimited_str(grouping_cols)
-    validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, damping_factor, max_iter, threshold,
-        grouping_cols_list, 'PageRank')
-    summary_table = out_table + "_summary"
-    _assert(not table_exists(summary_table),
-        "Graph PageRank: Output summary table ({summary_table}) already 
exists."
-        .format(**locals()))
-    src = edge_params["src"]
-    dest = edge_params["dest"]
-    nvertices = plpy.execute("""
-                SELECT COUNT({0}) AS cnt
-                FROM {1}
-            """.format(vertex_id, vertex_table))[0]["cnt"]
-    # A fixed threshold value, of say 1e-5, might not work well when the
-    # number of vertices is a billion, since the initial pagerank value
-    # of all nodes would then be 1/1e-9. So, assign default threshold
-    # value based on number of nodes in the graph.
-    # NOTE: The heuristic below is not based on any scientific evidence.
-    if threshold is None:
-        threshold = 1.0/(nvertices*1000)
-
-    # table/column names used when grouping_cols is set.
-    distinct_grp_table = ''
-    vertices_per_group = ''
-    vpg = ''
-    grouping_where_clause = ''
-    group_by_clause = ''
-    random_prob = ''
-
-    edge_temp_table = unique_string(desp='temp_edge')
-    distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
-            if grouping_cols else '', dest)!>)
-    plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
-    plpy.execute("""CREATE TEMP TABLE {edge_temp_table} AS
-        SELECT * FROM {edge_table}
-        {distribution}
-        """.format(**locals()))
-    # GPDB and HAWQ have distributed by clauses to help them with indexing.
-    # For Postgres we add the index explicitly.
-    sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-        <!"""CREATE INDEX ON {edge_temp_table} ({src});
-        """.format(**locals())!>,
-        <!''!>)
-    plpy.execute(sql_index)
-
-    # Intermediate tables required.
-    cur = unique_string(desp='cur')
-    message = unique_string(desp='message')
-    cur_unconv = unique_string(desp='cur_unconv')
-    message_unconv = unique_string(desp='message_unconv')
-    out_cnts = unique_string(desp='out_cnts')
-    out_cnts_cnt = unique_string(desp='cnt')
-    v1 = unique_string(desp='v1')
-
-    cur_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
-            if grouping_cols else '', vertex_id)!>)
-    cnts_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-            <!"DISTRIBUTED BY ({0}{1})".format(grouping_cols+','
-            if grouping_cols else '', vertex_id)!>)
-    cur_join_clause = """{edge_temp_table}.{dest}={cur}.{vertex_id}
-        """.format(**locals())
-    out_cnts_join_clause = """{out_cnts}.{vertex_id}={edge_temp_table}.{src}
-        """.format(**locals())
-    v1_join_clause = """{v1}.{vertex_id}={edge_temp_table}.{src}
-        """.format(**locals())
-
-    random_probability = (1.0-damping_factor)/nvertices
-    ######################################################################
-    # Create several strings that will be used to construct required
-    # queries. These strings will be required only during grouping.
-    random_jump_prob = random_probability
-    ignore_group_clause_first = ''
-    limit = ' LIMIT 1 '
-
-    grouping_cols_select_pr = ''
-    vertices_per_group_inner_join_pr = ''
-    ignore_group_clause_pr= ''
-
-    grouping_cols_select_ins = ''
-    vpg_from_clause_ins = ''
-    vpg_where_clause_ins = ''
-    message_grp_where_ins = ''
-    ignore_group_clause_ins = ''
-
-    nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
-    ignore_group_clause_ins_noincoming = ''
-
-    grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
-    group_by_grouping_cols_conv = ''
-    message_grp_clause_conv = ''
-    ignore_group_clause_conv = ''
-    ######################################################################
-
-    # Queries when groups are involved need a lot more conditions in
-    # various clauses, so populating the required variables. Some intermediate
-    # tables are unnecessary when no grouping is involved, so create some
-    # tables and certain columns only during grouping.
-    if grouping_cols:
-        distinct_grp_table = unique_string(desp='grp')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
-        plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
-                SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
-            """.format(**locals()))
-        vertices_per_group = unique_string(desp='nvert_grp')
-        init_pr = unique_string(desp='init')
-        random_prob = unique_string(desp='rand')
-        subq = unique_string(desp='subquery')
-        rand_damp = 1-damping_factor
-        grouping_where_clause = ' AND '.join(
-            [distinct_grp_table+'.'+col+'='+subq+'.'+col
-            for col in grouping_cols_list])
-        group_by_clause = ', '.join([distinct_grp_table+'.'+col
-            for col in grouping_cols_list])
-        # Find number of vertices in each group, this is the normalizing factor
-        # for computing the random_prob
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
-        plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
-                SELECT {distinct_grp_table}.*,
-                1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
-                {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS 
{random_prob}
-                FROM {distinct_grp_table} INNER JOIN (
-                    SELECT {grouping_cols}, {src} AS __vertices__
-                    FROM {edge_temp_table}
-                    UNION
-                    SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
-                ){subq}
-                ON {grouping_where_clause}
-                GROUP BY {group_by_clause}
-            """.format(**locals()))
-
-        grouping_where_clause = ' AND '.join(
-            [vertices_per_group+'.'+col+'='+subq+'.'+col
-            for col in grouping_cols_list])
-        group_by_clause = ', '.join([vertices_per_group+'.'+col
-            for col in grouping_cols_list])
+    with MinWarning('warning'):
+        params_types = {'src': str, 'dest': str}
+        default_args = {'src': 'src', 'dest': 'dest'}
+        edge_params = extract_keyvalue_params(edge_args, params_types, 
default_args)
+
+        # populate default values for optional params if null
+        if damping_factor is None:
+            damping_factor = 0.85
+        if max_iter is None:
+            max_iter = 100
+        if vertex_id is None:
+            vertex_id = "id"
+        if not grouping_cols:
+            grouping_cols = ''
+
+        grouping_cols_list = split_quoted_delimited_str(grouping_cols)
+        validate_pagerank_args(schema_madlib, vertex_table, vertex_id, 
edge_table,
+                               edge_params, out_table, damping_factor,
+                               max_iter, threshold, grouping_cols_list)
+        summary_table = out_table + "_summary"
+        _assert(not table_exists(summary_table),
+                "Graph PageRank: Output summary table ({summary_table}) 
already exists."
+                .format(**locals()))
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        n_vertices = plpy.execute("""
+                    SELECT COUNT({0}) AS cnt
+                    FROM {1}
+                """.format(vertex_id, vertex_table))[0]["cnt"]
+        # A fixed threshold value, of say 1e-5, might not work well when the
+        # number of vertices is a billion, since the initial pagerank value
+        # of all nodes would then be 1/1e-9. So, assign default threshold
+        # value based on number of nodes in the graph.
+        # NOTE: The heuristic below is not based on any scientific evidence.
+        if threshold is None:
+            threshold = 1.0 / (n_vertices * 1000)
+
+        # table/column names used when grouping_cols is set.
+        distinct_grp_table = ''
+        vertices_per_group = ''
+        vpg = ''
+        grouping_where_clause = ''
+        group_by_clause = ''
+        random_prob = ''
+
+        edge_temp_table = unique_string(desp='temp_edge')
+        grouping_cols_comma = grouping_cols + ',' if grouping_cols else ''
+        distribution = ('' if is_platform_pg() else
+                        "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, 
dest))
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
         plpy.execute("""
-                CREATE TEMP TABLE {cur} AS
-                SELECT {group_by_clause}, {subq}.__vertices__ as {vertex_id},
-                       {init_pr} AS pagerank
-                FROM {vertices_per_group} INNER JOIN (
-                    SELECT {grouping_cols}, {src} AS __vertices__
-                    FROM {edge_temp_table}
-                    UNION
-                    SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
-                ){subq}
-                ON {grouping_where_clause}
-                {cur_distribution}
-            """.format(**locals()))
-        vpg = unique_string(desp='vpg')
-        # Compute the out-degree of every node in the group-based subgraphs.
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
-        plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
-            SELECT {grouping_cols_select} {src} AS {vertex_id},
-                   COUNT({dest}) AS {out_cnts_cnt}
-            FROM {edge_temp_table}
-            GROUP BY {grouping_cols_select} {src}
-            {cnts_distribution}
-            """.format(grouping_cols_select=grouping_cols+','
-                if grouping_cols else '', **locals()))
-
-        message_grp = ' AND '.join(
-            ["{cur}.{col}={message}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
-            ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND '.join(
-            ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
-            ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        vpg_join_clause = ' AND '.join(
-            ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        vpg_t1_join_clause = ' AND '.join(
-            ["__t1__.{col}={vpg}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        # join clause specific to populating random_prob for nodes without any
-        # incoming edges.
-        edge_grouping_cols_select = ', '.join(
-            ["{edge_temp_table}.{col}".format(**locals())
-                for col in grouping_cols_list])
-        cur_grouping_cols_select = ', '.join(
-            ["{cur}.{col}".format(**locals()) for col in grouping_cols_list])
-        # Create output summary table:
-        cols_names_types = get_cols_and_types(edge_table)
-        grouping_cols_clause = ', '.join([c_name+" "+c_type
-            for (c_name, c_type) in cols_names_types
-            if c_name in grouping_cols_list])
-        plpy.execute("""
-                CREATE TABLE {summary_table} (
-                    {grouping_cols_clause},
-                    __iterations__ INTEGER
-                )
-            """.format(**locals()))
-        # Create output table. This will be updated whenever a group converges
-        # Note that vertex_id is assumed to be an integer (as described in
-        # documentation)
-        plpy.execute("""
-                CREATE TABLE {out_table} (
-                    {grouping_cols_clause},
-                    {vertex_id} INTEGER,
-                    pagerank DOUBLE PRECISION
-                )
-            """.format(**locals()))
-        temp_summary_table = unique_string(desp='temp_summary')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
-        plpy.execute("""
-                CREATE TABLE {temp_summary_table} (
-                    {grouping_cols_clause}
-                )
+            CREATE TEMP TABLE {edge_temp_table} AS
+                SELECT * FROM {edge_table}
+                {distribution}
             """.format(**locals()))
+
+        # GPDB and HAWQ have distributed by clauses to help them with indexing.
+        # For Postgres we add the index explicitly.
+        if is_platform_pg():
+            plpy.execute("CREATE INDEX ON {0}({1})".format(edge_temp_table, 
src))
+
+        # Intermediate tables required.
+        cur = unique_string(desp='cur')
+        message = unique_string(desp='message')
+        cur_unconv = unique_string(desp='cur_unconv')
+        message_unconv = unique_string(desp='message_unconv')
+        out_cnts = unique_string(desp='out_cnts')
+        out_cnts_cnt = unique_string(desp='cnt')
+        v1 = unique_string(desp='v1')
+
+        if is_platform_pg():
+            cur_distribution = cnts_distribution = ''
+        else:
+            cur_distribution = cnts_distribution = \
+                "DISTRIBUTED BY ({0}{1})".format(grouping_cols_comma, 
vertex_id)
+        cur_join_clause = "{edge_temp_table}.{dest} = 
{cur}.{vertex_id}".format(**locals())
+        out_cnts_join_clause = "{out_cnts}.{vertex_id} = 
{edge_temp_table}.{src}".format(**locals())
+        v1_join_clause = "{v1}.{vertex_id} = 
{edge_temp_table}.{src}".format(**locals())
+
+        random_probability = (1.0 - damping_factor) / n_vertices
+        ######################################################################
+        # Create several strings that will be used to construct required
+        # queries. These strings will be required only during grouping.
+        random_jump_prob = random_probability
+        ignore_group_clause_first = ''
+        limit = ' LIMIT 1 '
+
+        grouping_cols_select_pr = ''
+        vertices_per_group_inner_join_pr = ''
+        ignore_group_clause_pr = ''
+
+        grouping_cols_select_ins = ''
+        vpg_from_clause_ins = ''
+        vpg_where_clause_ins = ''
+        message_grp_where_ins = ''
+        ignore_group_clause_ins = ''
+
+        nodes_with_no_incoming_edges = unique_string(desp='no_incoming')
+        ignore_group_clause_ins_noincoming = ''
+
+        grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id)
+        group_by_grouping_cols_conv = ''
+        message_grp_clause_conv = ''
+        ignore_group_clause_conv = ''
         ######################################################################
-        # Strings required for the main PageRank computation query
-        grouping_cols_select_pr = edge_grouping_cols_select+', '
-        random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
-        vertices_per_group_inner_join_pr = """INNER JOIN {vertices_per_group}
-            AS {vpg} ON {vpg_join_clause}""".format(**locals())
-        ignore_group_clause_pr=' WHERE '+get_ignore_groups(summary_table,
-            edge_temp_table, grouping_cols_list)
-        ignore_group_clause_ins_noincoming = ' WHERE '+get_ignore_groups(
-            summary_table, nodes_with_no_incoming_edges, grouping_cols_list)
-        # Strings required for updating PageRank scores of vertices that have
-        # no incoming edges
-        grouping_cols_select_ins = cur_grouping_cols_select+','
-        vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
-            **locals())
-        vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
-            **locals())
-        message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
-        ignore_group_clause_ins = ' AND '+get_ignore_groups(summary_table,
-            cur, grouping_cols_list)
-        # Strings required for convergence test query
-        grouping_cols_select_conv = cur_grouping_cols_select
-        group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
-            cur_grouping_cols_select)
-        message_grp_clause_conv = '{0} AND '.format(message_grp)
-        ignore_group_clause_conv = ' AND '+get_ignore_groups(summary_table,
-            cur, grouping_cols_list)
-        limit = ''
-
-        # Find all nodes, in each group, that have no incoming edges. The 
PageRank
-        # value of these nodes are not updated using the first query in the
-        # following for loop. They must be explicitly plugged back in to the
-        # message table, with their corresponding group's random_prob as their
-        # PageRank values.
-        plpy.execute("""
-                CREATE TABLE {nodes_with_no_incoming_edges} AS
-                SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
-                        {vpg}.{random_prob} AS pagerank
-                FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
-                WHERE NOT EXISTS (
-                    SELECT 1
-                    FROM {edge_temp_table} AS __t2__
-                    WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause}
-                ) {vpg_where_clause_ins}
-            """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
-                        for col in grouping_cols_list]),
-                    where_group_clause=' AND 
'.join(['__t1__.{0}=__t2__.{0}'.format(col)
-                        for col in grouping_cols_list]),
-                    **locals()))
-    else:
-        # cur and out_cnts tables can be simpler when no grouping is involved.
-        init_value = 1.0/nvertices
-        plpy.execute("""
-                CREATE TEMP TABLE {cur} AS
-                SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank
-                FROM {vertex_table}
-                {cur_distribution}
-            """.format(**locals()))
 
-        # Compute the out-degree of every node in the graph.
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
-        plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
-            SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
-            FROM {edge_temp_table}
-            GROUP BY {src}
-            {cnts_distribution}
-            """.format(**locals()))
-        # The summary table when there is no grouping will contain only
-        # the iteration column. We don't need to create the out_table
-        # when no grouping is used since the 'cur' table will be renamed
-        # to out_table after pagerank computation is completed.
-        plpy.execute("""
-                CREATE TABLE {summary_table} (
-                    __iterations__ INTEGER
-                )
-            """.format(**locals()))
-        # Find all nodes in the graph that don't have any incoming edges and
-        # assign random_prob as their pagerank values.
-        plpy.execute("""
-                CREATE TABLE {nodes_with_no_incoming_edges} AS
-                SELECT DISTINCT({src}), {random_probability} AS pagerank
-                FROM {edge_temp_table}
-                EXCEPT
-                    (SELECT DISTINCT({dest}), {random_probability} AS pagerank
-                    FROM {edge_temp_table})
-            """.format(**locals()))
-    unconverged = 0
-    iteration_num = 0
-    for iteration_num in range(max_iter):
-        #####################################################################
-        # PageRank for node 'A' at any given iteration 'i' is given by:
-        # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
-        #           PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
-        # where 'N' is the number of vertices in the graph,
-        # B, C are nodes that have edges to node A, and
-        # degree(node) represents the number of outgoing edges from 'node'
-        #####################################################################
-        # Essentially, the pagerank for a node is based on an aggregate of a
-        # fraction of the pagerank values of all the nodes that have incoming
-        # edges to it, along with a small random probability.
-        # More information can be found at:
-        # https://en.wikipedia.org/wiki/PageRank#Damping_factor
-
-        # The query below computes the PageRank of each node using the above
-        # formula. A small explanatory note on ignore_group_clause:
-        # This is used only when grouping is set. This essentially will have
-        # the condition that will help skip the PageRank computation on groups
-        # that have converged.
-        plpy.execute("""
-                CREATE TABLE {message} AS
-                SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS 
{vertex_id},
-                        
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
 AS pagerank
+        # Queries when groups are involved need a lot more conditions in
+        # various clauses, so populating the required variables. Some 
intermediate
+        # tables are unnecessary when no grouping is involved, so create some
+        # tables and certain columns only during grouping.
+        if grouping_cols:
+            distinct_grp_table = unique_string(desp='grp')
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+            plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS
+                    SELECT DISTINCT {grouping_cols} FROM {edge_temp_table}
+                """.format(**locals()))
+            vertices_per_group = unique_string(desp='nvert_grp')
+            init_pr = unique_string(desp='init')
+            random_prob = unique_string(desp='rand')
+            subq = unique_string(desp='subquery')
+            rand_damp = 1 - damping_factor
+            grouping_where_clause = ' AND '.join(
+                [distinct_grp_table + '.' + col + '=' + subq + '.' + col
+                 for col in grouping_cols_list])
+            group_by_clause = ', '.join([distinct_grp_table + '.' + col
+                                         for col in grouping_cols_list])
+            # Find number of vertices in each group, this is the normalizing 
factor
+            # for computing the random_prob
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(vertices_per_group))
+            plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS
+                    SELECT {distinct_grp_table}.*,
+                    1/COUNT(__vertices__)::DOUBLE PRECISION AS {init_pr},
+                    {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION AS 
{random_prob}
+                    FROM {distinct_grp_table} INNER JOIN (
+                        SELECT {grouping_cols}, {src} AS __vertices__
+                        FROM {edge_temp_table}
+                        UNION
+                        SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+                    ){subq}
+                    ON {grouping_where_clause}
+                    GROUP BY {group_by_clause}
+                """.format(**locals()))
+
+            grouping_where_clause = ' AND '.join(
+                [vertices_per_group + '.' + col + '=' + subq + '.' + col
+                 for col in grouping_cols_list])
+            group_by_clause = ', '.join([vertices_per_group + '.' + col
+                                         for col in grouping_cols_list])
+            plpy.execute("""
+                    CREATE TEMP TABLE {cur} AS
+                    SELECT {group_by_clause}, {subq}.__vertices__ as 
{vertex_id},
+                           {init_pr} AS pagerank
+                    FROM {vertices_per_group} INNER JOIN (
+                        SELECT {grouping_cols}, {src} AS __vertices__
+                        FROM {edge_temp_table}
+                        UNION
+                        SELECT {grouping_cols}, {dest} FROM {edge_temp_table}
+                    ){subq}
+                    ON {grouping_where_clause}
+                    {cur_distribution}
+                """.format(**locals()))
+            vpg = unique_string(desp='vpg')
+            # Compute the out-degree of every node in the group-based 
subgraphs.
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+            plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+                SELECT {grouping_cols_select} {src} AS {vertex_id},
+                       COUNT({dest}) AS {out_cnts_cnt}
                 FROM {edge_temp_table}
-                    INNER JOIN {cur} ON {cur_join_clause}
-                    INNER JOIN {out_cnts} ON {out_cnts_join_clause}
-                    INNER JOIN {cur} AS {v1} ON {v1_join_clause}
-                    {vertices_per_group_inner_join_pr}
-                {ignore_group_clause}
-                GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
-                {cur_distribution}
-            """.format(ignore_group_clause=ignore_group_clause_pr
-                    if iteration_num>0 else ignore_group_clause_first,
-                **locals()))
-        # If there are nodes that have no incoming edges, they are not
-        # captured in the message table. Insert entries for such nodes,
-        # with random_prob.
-        plpy.execute("""
-                INSERT INTO {message}
-                SELECT *
-                FROM {nodes_with_no_incoming_edges}
-                {ignore_group_clause}
-            """.format(ignore_group_clause=ignore_group_clause_ins_noincoming
-                    if iteration_num>0 else ignore_group_clause_first,
-                    **locals()))
-        # Check for convergence:
-        ## Check for convergence only if threshold != 0.
-        if threshold != 0:
-            # message_unconv and cur_unconv will contain the unconverged groups
-            # after current # and previous iterations respectively. Groups that
-            # are missing in message_unconv but appear in cur_unconv are the
-            # groups that have converged after this iteration's computations.
-            # If no grouping columns are specified, then we check if there is
-            # at least one unconverged node (limit 1 is used in the query).
+                GROUP BY {grouping_cols_select} {src}
+                {cnts_distribution}
+                """.format(grouping_cols_select=grouping_cols + ','
+                           if grouping_cols else '', **locals()))
+
+            message_grp = ' AND '.join(
+                ["{cur}.{col}={message}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            cur_join_clause = cur_join_clause + ' AND ' + ' AND '.join(
+                ["{edge_temp_table}.{col}={cur}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            out_cnts_join_clause = out_cnts_join_clause + ' AND ' + ' AND 
'.join(
+                ["{edge_temp_table}.{col}={out_cnts}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            v1_join_clause = v1_join_clause + ' AND ' + ' AND '.join(
+                ["{edge_temp_table}.{col}={v1}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            vpg_join_clause = ' AND '.join(
+                ["{edge_temp_table}.{col}={vpg}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            vpg_t1_join_clause = ' AND '.join(
+                ["__t1__.{col}={vpg}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            # join clause specific to populating random_prob for nodes without 
any
+            # incoming edges.
+            edge_grouping_cols_select = ', '.join(
+                ["{edge_temp_table}.{col}".format(**locals())
+                    for col in grouping_cols_list])
+            cur_grouping_cols_select = ', '.join(
+                ["{cur}.{col}".format(**locals()) for col in 
grouping_cols_list])
+            # Create output summary table:
+            cols_names_types = get_cols_and_types(edge_table)
+            grouping_cols_clause = ', '.join([c_name + " " + c_type
+                                              for (c_name, c_type) in 
cols_names_types
+                                              if c_name in grouping_cols_list])
             plpy.execute("""
-                    CREATE TEMP TABLE {message_unconv} AS
-                    SELECT {grouping_cols_select_conv}
-                    FROM {message}
-                    INNER JOIN {cur}
-                    ON {cur}.{vertex_id}={message}.{vertex_id}
-                    WHERE {message_grp_clause_conv}
-                        ABS({cur}.pagerank-{message}.pagerank) > {threshold}
-                    {ignore_group_clause}
-                    {group_by_grouping_cols_conv}
-                    {limit}
-                """.format(ignore_group_clause=ignore_group_clause_ins
-                        if iteration_num>0 else ignore_group_clause_conv,
-                    **locals()))
-            unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
-                """.format(message_unconv))[0]["cnt"]
-            if iteration_num > 0 and grouping_cols:
-                # Update result and summary tables for groups that have
-                # converged
-                # since the last iteration.
-                update_result_tables(temp_summary_table, iteration_num,
-                    summary_table, out_table, message, grouping_cols_list,
-                    cur_unconv, message_unconv)
-            plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
-            plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
-                {cur_unconv} """.format(**locals()))
+                    CREATE TABLE {summary_table} (
+                        {grouping_cols_clause},
+                        __iterations__ INTEGER
+                    )
+                """.format(**locals()))
+            # Create output table. This will be updated whenever a group 
converges
+            # Note that vertex_id is assumed to be an integer (as described in
+            # documentation)
+            plpy.execute("""
+                    CREATE TABLE {out_table} (
+                        {grouping_cols_clause},
+                        {vertex_id} INTEGER,
+                        pagerank DOUBLE PRECISION
+                    )
+                """.format(**locals()))
+            temp_summary_table = unique_string(desp='temp_summary')
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_summary_table))
+            plpy.execute("""
+                    CREATE TABLE {temp_summary_table} (
+                        {grouping_cols_clause}
+                    )
+                """.format(**locals()))
+            
######################################################################
+            # Strings required for the main PageRank computation query
+            grouping_cols_select_pr = edge_grouping_cols_select + ', '
+            random_jump_prob = 'MIN({vpg}.{random_prob})'.format(**locals())
+            vertices_per_group_inner_join_pr = """INNER JOIN 
{vertices_per_group}
+                AS {vpg} ON {vpg_join_clause}""".format(**locals())
+            ignore_group_clause_pr = ' WHERE ' + 
get_ignore_groups(summary_table,
+                                                                   
edge_temp_table, grouping_cols_list)
+            ignore_group_clause_ins_noincoming = ' WHERE ' + get_ignore_groups(
+                summary_table, nodes_with_no_incoming_edges, 
grouping_cols_list)
+            # Strings required for updating PageRank scores of vertices that 
have
+            # no incoming edges
+            grouping_cols_select_ins = cur_grouping_cols_select + ','
+            vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format(
+                **locals())
+            vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format(
+                **locals())
+            message_grp_where_ins = 'WHERE {message_grp}'.format(**locals())
+            ignore_group_clause_ins = ' AND ' + 
get_ignore_groups(summary_table,
+                                                                  cur, 
grouping_cols_list)
+            # Strings required for convergence test query
+            grouping_cols_select_conv = cur_grouping_cols_select
+            group_by_grouping_cols_conv = ' GROUP BY {0}'.format(
+                cur_grouping_cols_select)
+            message_grp_clause_conv = '{0} AND '.format(message_grp)
+            ignore_group_clause_conv = ' AND ' + 
get_ignore_groups(summary_table,
+                                                                   cur, 
grouping_cols_list)
+            limit = ''
+
+            # Find all nodes, in each group, that have no incoming edges. The 
PageRank
+            # value of these nodes are not updated using the first query in the
+            # following for loop. They must be explicitly plugged back in to 
the
+            # message table, with their corresponding group's random_prob as 
their
+            # PageRank values.
+            plpy.execute("""
+                    CREATE TABLE {nodes_with_no_incoming_edges} AS
+                    SELECT {select_group_cols}, __t1__.{src} AS {vertex_id},
+                            {vpg}.{random_prob} AS pagerank
+                    FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins}
+                    WHERE NOT EXISTS (
+                        SELECT 1
+                        FROM {edge_temp_table} AS __t2__
+                        WHERE __t1__.{src}=__t2__.{dest} AND 
{where_group_clause}
+                    ) {vpg_where_clause_ins}
+                """.format(select_group_cols=','.join(['__t1__.{0}'.format(col)
+                                                       for col in 
grouping_cols_list]),
+                           where_group_clause=' AND 
'.join(['__t1__.{0}=__t2__.{0}'.format(col)
+                                                            for col in 
grouping_cols_list]),
+                           **locals()))
         else:
-            # Do not run convergence test if threshold=0, since that implies
-            # the user wants to run max_iter iterations.
-            unconverged = 1
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
-        plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+            # cur and out_cnts tables can be simpler when no grouping is 
involved.
+            init_value = 1.0 / n_vertices
+            plpy.execute("""
+                    CREATE TEMP TABLE {cur} AS
+                    SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS 
pagerank
+                    FROM {vertex_table}
+                    {cur_distribution}
                 """.format(**locals()))
-        if unconverged == 0:
-            break
 
-    # If there still are some unconverged groups/(entire table),
-    # update results.
-    if grouping_cols:
-        if unconverged > 0:
+            # Compute the out-degree of every node in the graph.
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts))
+            plpy.execute("""CREATE TEMP TABLE {out_cnts} AS
+                SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt}
+                FROM {edge_temp_table}
+                GROUP BY {src}
+                {cnts_distribution}
+                """.format(**locals()))
+            # The summary table when there is no grouping will contain only
+            # the iteration column. We don't need to create the out_table
+            # when no grouping is used since the 'cur' table will be renamed
+            # to out_table after pagerank computation is completed.
+            plpy.execute("""
+                    CREATE TABLE {summary_table} (
+                        __iterations__ INTEGER
+                    )
+                """.format(**locals()))
+            # Find all nodes in the graph that don't have any incoming edges 
and
+            # assign random_prob as their pagerank values.
+            plpy.execute("""
+                    CREATE TABLE {nodes_with_no_incoming_edges} AS
+                    SELECT DISTINCT({src}), {random_probability} AS pagerank
+                    FROM {edge_temp_table}
+                    EXCEPT
+                        (SELECT DISTINCT({dest}), {random_probability} AS 
pagerank
+                        FROM {edge_temp_table})
+                """.format(**locals()))
+        unconverged = 0
+        iteration_num = 0
+        for iteration_num in range(max_iter):
+            
#####################################################################
+            # PageRank for node 'A' at any given iteration 'i' is given by:
+            # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) +
+            #           PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N
+            # where 'N' is the number of vertices in the graph,
+            # B, C are nodes that have edges to node A, and
+            # degree(node) represents the number of outgoing edges from 'node'
+            
#####################################################################
+            # Essentially, the pagerank for a node is based on an aggregate of 
a
+            # fraction of the pagerank values of all the nodes that have 
incoming
+            # edges to it, along with a small random probability.
+            # More information can be found at:
+            # https://en.wikipedia.org/wiki/PageRank#Damping_factor
+
+            # The query below computes the PageRank of each node using the 
above
+            # formula. A small explanatory note on ignore_group_clause:
+            # This is used only when grouping is set. This essentially will 
have
+            # the condition that will help skip the PageRank computation on 
groups
+            # that have converged.
+            plpy.execute("""
+                    CREATE TABLE {message} AS
+                    SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} 
AS {vertex_id},
+                            
SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
 AS pagerank
+                    FROM {edge_temp_table}
+                        INNER JOIN {cur} ON {cur_join_clause}
+                        INNER JOIN {out_cnts} ON {out_cnts_join_clause}
+                        INNER JOIN {cur} AS {v1} ON {v1_join_clause}
+                        {vertices_per_group_inner_join_pr}
+                    {ignore_group_clause}
+                    GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest}
+                    {cur_distribution}
+                """.format(ignore_group_clause=ignore_group_clause_pr
+                           if iteration_num > 0 else ignore_group_clause_first,
+                           **locals()))
+            # If there are nodes that have no incoming edges, they are not
+            # captured in the message table. Insert entries for such nodes,
+            # with random_prob.
+            plpy.execute("""
+                    INSERT INTO {message}
+                    SELECT *
+                    FROM {nodes_with_no_incoming_edges}
+                    {ignore_group_clause}
+                
""".format(ignore_group_clause=ignore_group_clause_ins_noincoming
+                           if iteration_num > 0 else ignore_group_clause_first,
+                           **locals()))
+            # Check for convergence:
+            # Check for convergence only if threshold != 0.
             if threshold != 0:
-                # We completed max_iters, but there are still some unconverged
-                # groups # Update the result and summary tables for unconverged
-                # groups.
-                update_result_tables(temp_summary_table, iteration_num,
-                    summary_table, out_table, cur, grouping_cols_list,
-                    cur_unconv)
+                # message_unconv and cur_unconv will contain the unconverged 
groups
+                # after current # and previous iterations respectively. Groups 
that
+                # are missing in message_unconv but appear in cur_unconv are 
the
+                # groups that have converged after this iteration's 
computations.
+                # If no grouping columns are specified, then we check if there 
is
+                # at least one unconverged node (limit 1 is used in the query).
+                plpy.execute("""
+                        CREATE TEMP TABLE {message_unconv} AS
+                        SELECT {grouping_cols_select_conv}
+                        FROM {message}
+                        INNER JOIN {cur}
+                        ON {cur}.{vertex_id}={message}.{vertex_id}
+                        WHERE {message_grp_clause_conv}
+                            ABS({cur}.pagerank-{message}.pagerank) > 
{threshold}
+                        {ignore_group_clause}
+                        {group_by_grouping_cols_conv}
+                        {limit}
+                    """.format(ignore_group_clause=ignore_group_clause_ins
+                               if iteration_num > 0 else 
ignore_group_clause_conv,
+                               **locals()))
+                unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
+                    """.format(message_unconv))[0]["cnt"]
+                if iteration_num > 0 and grouping_cols:
+                    # Update result and summary tables for groups that have
+                    # converged
+                    # since the last iteration.
+                    update_result_tables(temp_summary_table, iteration_num,
+                                         summary_table, out_table, message, 
grouping_cols_list,
+                                         cur_unconv, message_unconv)
+                plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv))
+                plpy.execute("""ALTER TABLE {message_unconv} RENAME TO
+                    {cur_unconv} """.format(**locals()))
             else:
-                # No group has converged. List of all group values are in
-                # distinct_grp_table.
-                update_result_tables(temp_summary_table, iteration_num,
-                    summary_table, out_table, cur, grouping_cols_list,
-                    distinct_grp_table)
-    else:
-        plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
-            """.format(table_name=cur, **locals()))
-        plpy.execute("""
+                # Do not run convergence test if threshold=0, since that 
implies
+                # the user wants to run max_iter iterations.
+                unconverged = 1
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(cur))
+            plpy.execute("""ALTER TABLE {message} RENAME TO {cur}
+                    """.format(**locals()))
+            if unconverged == 0:
+                break
+
+        # If there still are some unconverged groups/(entire table),
+        # update results.
+        if grouping_cols:
+            if unconverged > 0:
+                if threshold != 0:
+                    # We completed max_iters, but there are still some 
unconverged
+                    # groups # Update the result and summary tables for 
unconverged
+                    # groups.
+                    update_result_tables(temp_summary_table, iteration_num,
+                                         summary_table, out_table, cur, 
grouping_cols_list,
+                                         cur_unconv)
+                else:
+                    # No group has converged. List of all group values are in
+                    # distinct_grp_table.
+                    update_result_tables(temp_summary_table, iteration_num,
+                                         summary_table, out_table, cur, 
grouping_cols_list,
+                                         distinct_grp_table)
+        else:
+            plpy.execute("""
+                ALTER TABLE {table_name}
+                RENAME TO {out_table}
+                """.format(table_name=cur, **locals()))
+            plpy.execute("""
                 INSERT INTO {summary_table} VALUES
-                ({iteration_num}+1);
-            """.format(**locals()))
+                ({iteration_num}+1)
+                """.format(**locals()))
+
+        # Step 4: Cleanup
+        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6}
+            """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
+                       message_unconv, nodes_with_no_incoming_edges))
+        if grouping_cols:
+            plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2}
+                """.format(vertices_per_group, temp_summary_table,
+                           distinct_grp_table))
 
-    ## Step 4: Cleanup
-    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6};
-        """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
-                    message_unconv, nodes_with_no_incoming_edges))
-    if grouping_cols:
-        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
-            """.format(vertices_per_group, temp_summary_table,
-                distinct_grp_table))
-    plpy.execute("SET client_min_messages TO %s" % old_msg_level)
 
 def update_result_tables(temp_summary_table, i, summary_table, out_table,
-    res_table, grouping_cols_list, cur_unconv, message_unconv=None):
+                         res_table, grouping_cols_list, cur_unconv,
+                         message_unconv=None):
     """
         This function updates the summary and output tables only for those
         groups that have converged. This is found out by looking at groups
@@ -556,7 +551,7 @@ def update_result_tables(temp_summary_table, i, 
summary_table, out_table,
             FROM {cur_unconv}
             WHERE {join_condition}
             """.format(join_condition=get_ignore_groups(
-                message_unconv, cur_unconv, grouping_cols_list), **locals()))
+            message_unconv, cur_unconv, grouping_cols_list), **locals()))
     plpy.execute("""
         INSERT INTO {summary_table}
         SELECT *, {i}+1 AS __iteration__
@@ -569,21 +564,24 @@ def update_result_tables(temp_summary_table, i, 
summary_table, out_table,
         INNER JOIN {temp_summary_table}
         ON {join_condition}
         """.format(join_condition=' AND '.join(
-                ["{res_table}.{col}={temp_summary_table}.{col}".format(
-                    **locals())
-                for col in grouping_cols_list]), **locals()))
+        ["{res_table}.{col}={temp_summary_table}.{col}".format(
+            **locals())
+         for col in grouping_cols_list]), **locals()))
+
 
 def get_ignore_groups(first_table, second_table, grouping_cols_list):
     """
         This function generates the necessary clause to only select the
         groups that appear in second_table and not in first_table.
     """
-    return """({second_table_cols}) NOT IN (SELECT {grouping_cols} FROM
-    {first_table}) """.format(second_table_cols=', '.join(
-            ["{second_table}.{col}".format(**locals())
-            for col in grouping_cols_list]),
-        grouping_cols=', '.join([col for col in grouping_cols_list]),
-        **locals())
+    second_table_cols = ', '.join(["{0}.{1}".format(second_table, col)
+                                   for col in grouping_cols_list])
+    grouping_cols = ', '.join([col for col in grouping_cols_list])
+    return """({second_table_cols}) NOT IN
+                (SELECT {grouping_cols}
+                 FROM {first_table})
+           """.format(**locals())
+
 
 def pagerank_help(schema_madlib, message, **kwargs):
     """
@@ -601,7 +599,7 @@ def pagerank_help(schema_madlib, message, **kwargs):
             message.lower() in ("usage", "help", "?"):
         help_string = "Get from method below"
         help_string = get_graph_usage(schema_madlib, 'PageRank',
-            """out_table     TEXT, -- Name of the output table for PageRank
+                                      """out_table     TEXT, -- Name of the 
output table for PageRank
     damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model
                                      -- (DEFAULT = 0.85)
     max_iter      INTEGER, -- Maximum iteration number (DEFAULT = 100)

Reply via email to