Repository: incubator-madlib
Updated Branches:
  refs/heads/master 8c9b955cd -> d487df3c4


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/sssp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.py_in 
b/src/ports/postgres/modules/graph/sssp.py_in
index 4839d2d..93497c4 100644
--- a/src/ports/postgres/modules/graph/sssp.py_in
+++ b/src/ports/postgres/modules/graph/sssp.py_in
@@ -33,21 +33,22 @@ from graph_utils import get_graph_usage
 from graph_utils import _grp_from_table
 from graph_utils import _check_groups
 from utilities.control import MinWarning
+
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
 from utilities.utilities import split_quoted_delimited_str
+from utilities.utilities import is_platform_pg, is_platform_hawq
+
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 from utilities.validate_args import get_expr_type
 
-m4_changequote(`<!', `!>')
 
 def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
-               edge_args, source_vertex, out_table, grouping_cols, **kwargs):
-
-       """
+               edge_args, source_vertex, out_table, grouping_cols, **kwargs):
+    """
     Single source shortest path function for graphs using the Bellman-Ford
     algorhtm [1].
     Args:
@@ -63,383 +64,382 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, 
edge_table,
     [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
     """
 
-       with MinWarning("warning"):
-
-               INT_MAX = 2147483647
-               INFINITY = "'Infinity'"
-               EPSILON = 0.000001
-
-               message = unique_string(desp='message')
-
-               oldupdate = unique_string(desp='oldupdate')
-               newupdate = unique_string(desp='newupdate')
-
-               params_types = {'src': str, 'dest': str, 'weight': str}
-               default_args = {'src': 'src', 'dest': 'dest', 'weight': 
'weight'}
-               edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-               # Prepare the input for recording in the summary table
-               if vertex_id is None:
-                       v_st= "NULL"
-                       vertex_id = "id"
-               else:
-                       v_st = vertex_id
-               if edge_args is None:
-                       e_st = "NULL"
-               else:
-                       e_st = edge_args
-               if grouping_cols is None:
-                       g_st = "NULL"
-                       glist = None
-               else:
-                       g_st = grouping_cols
-                       glist = split_quoted_delimited_str(grouping_cols)
-
-               src = edge_params["src"]
-               dest = edge_params["dest"]
-               weight = edge_params["weight"]
-
-               distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-                       <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
-               local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-                       <!"DISTRIBUTED BY (id)"!>)
-
-               is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-               _validate_sssp(vertex_table, vertex_id, edge_table,
-                       edge_params, source_vertex, out_table, glist)
-
-               plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
-                       message,oldupdate,newupdate))
-
-               # Initialize grouping related variables
-               comma_grp = ""
-               comma_grp_e = ""
-               comma_grp_m = ""
-               grp_comma = ""
-               checkg_oo = ""
-               checkg_eo = ""
-               checkg_ex = ""
-               checkg_om = ""
-               group_by = ""
-
-               if grouping_cols is not None:
-                       comma_grp = " , " + grouping_cols
-                       group_by = " , " + _grp_from_table(edge_table,glist)
-                       comma_grp_e = " , " + _grp_from_table(edge_table,glist)
-                       comma_grp_m = " , " + _grp_from_table("message",glist)
-                       grp_comma = grouping_cols + " , "
-
-                       checkg_oo_sub = 
_check_groups(out_table,"oldupdate",glist)
-                       checkg_oo = " AND " + checkg_oo_sub
-                       checkg_eo = " AND " + 
_check_groups(edge_table,"oldupdate",glist)
-                       checkg_ex = " AND " + 
_check_groups(edge_table,"x",glist)
-                       checkg_om = " AND " + 
_check_groups("out_table","message",glist)
-
-               w_type = get_expr_type(weight,edge_table).lower()
-               init_w = INT_MAX
-               if w_type in ['real','double precision','float8']:
-                       init_w = INFINITY
-
-               # We keep a table of every vertex, the minimum cost to that 
destination
-               # seen so far and the parent to this vertex in the associated 
shortest
-               # path. This table will be updated throughout the execution.
-               plpy.execute(
-                       """ CREATE TABLE {out_table} AS ( SELECT
-                                       {grp_comma} {src} AS {vertex_id}, 
{weight},
-                                       {src} AS parent FROM {edge_table} LIMIT 
0)
-                               {distribution} """.format(**locals()))
-
-               # We keep a summary table to keep track of the parameters used 
for this
-               # SSSP run. This table is used in the path finding function to 
eliminate
-               # the need for repetition.
-               plpy.execute( """ CREATE TABLE {out_table}_summary  (
-                       vertex_table            TEXT,
-                       vertex_id               TEXT,
-                       edge_table              TEXT,
-                       edge_args               TEXT,
-                       source_vertex           INTEGER,
-                       out_table               TEXT,
-                       grouping_cols           TEXT)
-                       """.format(**locals()))
-               plpy.execute( """ INSERT INTO {out_table}_summary VALUES
-                       ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
-                       {source_vertex}, '{out_table}', '{g_st}')
-                       """.format(**locals()))
-
-               # We keep 2 update tables and alternate them during the 
execution.
-               # This is necessary since we need to know which vertices are 
updated in
-               # the previous iteration to calculate the next set of updates.
-               plpy.execute(
-                       """ CREATE TEMP TABLE {oldupdate} AS ( SELECT
-                                       {src} AS id, {weight},
-                                       {src} AS parent {comma_grp} FROM 
{edge_table} LIMIT 0)
-                               {local_distribution}
-                               """.format(**locals()))
-               plpy.execute(
-                       """ CREATE TEMP TABLE {newupdate} AS ( SELECT
-                                       {src} AS id, {weight},
-                                       {src} AS parent {comma_grp} FROM 
{edge_table} LIMIT 0)
-                               {local_distribution}
-                               """.format(**locals()))
-
-               # Since HAWQ does not allow us to update, we create a new table 
and
-               # rename at every iteration.
-               if is_hawq:
-                       temp_table = unique_string(desp='temp')
-                       sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM 
{out_table} )
-                               {distribution} """
-                       plpy.execute(sql.format(**locals()))
-
-               # GPDB and HAWQ have distributed by clauses to help them with 
indexing.
-               # For Postgres we add the indices manually.
-               sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-                       <!""" CREATE INDEX ON {out_table} ({vertex_id});
-                               CREATE INDEX ON {oldupdate} (id);
-                               CREATE INDEX ON {newupdate} (id);
-                       """.format(**locals())!>,
-                       <!''!>)
-               plpy.execute(sql_index)
-
-               # The initialization step is quite different when grouping is 
involved
-               # since not every group (subgraph) will have the same set of 
vertices.
-
-               # Example:
-               # Assume there are two grouping columns g1 and g2
-               # g1 values are 0 and 1. g2 values are 5 and 6
-               if grouping_cols is not None:
-
-                       distinct_grp_table = unique_string(desp='grp')
-                       plpy.execute(""" DROP TABLE IF EXISTS 
{distinct_grp_table} """.
-                               format(**locals()))
-                       plpy.execute( """ CREATE TEMP TABLE 
{distinct_grp_table} AS
-                               SELECT DISTINCT {grouping_cols} FROM 
{edge_table} """.
-                               format(**locals()))
-                       subq = unique_string(desp='subquery')
-
-                       checkg_ds_sub = 
_check_groups(distinct_grp_table,subq,glist)
-                       grp_d_comma = _grp_from_table(distinct_grp_table,glist) 
+","
-
-                       plpy.execute(
-                               """ INSERT INTO {out_table}
-                               SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
-                                       {init_w} AS {weight}, NULL::INT AS 
parent
-                               FROM {distinct_grp_table} INNER JOIN
-                                       (
-                                       SELECT {src} AS {vertex_id} {comma_grp}
-                                       FROM {edge_table}
-                                       UNION
-                                       SELECT {dest} AS {vertex_id} {comma_grp}
-                                       FROM {edge_table}
-                                       ) {subq} ON ({checkg_ds_sub})
-                               WHERE {vertex_id} IS NOT NULL
-                               """.format(**locals()))
-
-                       plpy.execute(
-                               """ INSERT INTO {oldupdate}
-                                       SELECT {source_vertex}, 0, 
{source_vertex},
-                                       {grouping_cols}
-                                       FROM {distinct_grp_table}
-                               """.format(**locals()))
-
-                       # The maximum number of vertices for any group.
-                       # Used for determining negative cycles.
-                       v_cnt = plpy.execute(
-                               """ SELECT max(count) as max FROM (
-                                               SELECT count({vertex_id}) AS 
count
-                                               FROM {out_table}
-                                               GROUP BY {grouping_cols}) x
-                               """.format(**locals()))[0]['max']
-                       plpy.execute("DROP TABLE IF EXISTS 
{0}".format(distinct_grp_table))
-               else:
-                       plpy.execute(
-                               """ INSERT INTO {out_table}
-                               SELECT {vertex_id} AS {vertex_id},
-                                       {init_w} AS {weight},
-                                       NULL AS parent
-                               FROM {vertex_table}
-                               WHERE {vertex_id} IS NOT NULL
-                                """.format(**locals()))
-
-                       # The source can be reached with 0 cost and it has 
itself as the
-                       # parent.
-                       plpy.execute(
-                               """ INSERT INTO {oldupdate}
-                                       
VALUES({source_vertex},0,{source_vertex})
-                               """.format(**locals()))
-
-                       v_cnt = plpy.execute(
-                               """ SELECT count(*) FROM {vertex_table}
-                               WHERE {vertex_id} IS NOT NULL
-                               """.format(**locals()))[0]['count']
-
-               for i in range(0,v_cnt+1):
-
-                       # Apply the updates calculated in the last iteration.
-                       if is_hawq:
-                               sql = """
-                               TRUNCATE TABLE {temp_table};
-                               INSERT INTO {temp_table}
-                                       SELECT *
-                                       FROM {out_table}
-                                       WHERE NOT EXISTS (
-                                               SELECT 1
-                                               FROM {oldupdate} as oldupdate
-                                               WHERE {out_table}.{vertex_id} = 
oldupdate.id
-                                               {checkg_oo})
-                                       UNION
-                                       SELECT {grp_comma} id, {weight}, parent 
FROM {oldupdate};
-                               """
-                               plpy.execute(sql.format(**locals()))
-                               plpy.execute("DROP TABLE {0}".format(out_table))
-                               plpy.execute("ALTER TABLE {0} RENAME TO {1}".
-                                       format(temp_table,out_table))
-                               sql = """ CREATE TABLE {temp_table} AS (
-                                       SELECT * FROM {out_table} LIMIT 0)
-                                       {distribution};"""
-                               plpy.execute(sql.format(**locals()))
-                               ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
-                                       format(oldupdate))
-                       else:
-                               sql = """
-                               UPDATE {out_table} SET
-                               {weight}=oldupdate.{weight},
-                               parent=oldupdate.parent
-                               FROM
-                               {oldupdate} AS oldupdate
-                               WHERE
-                               {out_table}.{vertex_id}=oldupdate.id AND
-                               {out_table}.{weight}>oldupdate.{weight} 
{checkg_oo}
-                               """
-                               ret = plpy.execute(sql.format(**locals()))
-
-                       if ret.nrows() == 0:
-                               break
-
-                       plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
-
-                       # 'oldupdate' table has the update info from the last 
iteration
-
-                       # Consider every edge that has an updated source
-                       # From these edges:
-                       # For every destination vertex, find the min total cost 
to reach.
-                       # Note that, just calling an aggregate function with 
group by won't
-                       # let us store the src field of the edge (needed for 
the parent).
-                       # This is why we need the 'x'; it gives a list of 
destinations and
-                       # associated min values. Using these values, we 
identify which edge
-                       # is selected.
-
-                       # Since using '=' with floats is dangerous we use an 
epsilon value
-                       # for comparison.
-
-                       # Once we have a list of edges and values (stores as 
'message'),
-                       # we check if these values are lower than the existing 
shortest
-                       # path values.
-
-                       sql = (""" INSERT INTO {newupdate}
-                               SELECT DISTINCT ON (message.id {comma_grp})
-                                       message.id AS id,
-                                       message.{weight} AS {weight},
-                                       message.parent AS parent {comma_grp_m}
-                               FROM {out_table} AS out_table INNER JOIN
-                                       (
-                                       SELECT {edge_table}.{dest} AS id, 
x.{weight} AS {weight},
-                                               oldupdate.id AS parent 
{comma_grp_e}
-                                       FROM {oldupdate} AS oldupdate INNER JOIN
-                                               {edge_table}  ON
-                                                       ({edge_table}.{src} = 
oldupdate.id {checkg_eo})
-                                               INNER JOIN
-                                               (
-                                               SELECT {edge_table}.{dest} AS 
id,
-                                                       min(oldupdate.{weight} +
-                                                               
{edge_table}.{weight}) AS {weight} {comma_grp_e}
-                                               FROM {oldupdate} AS oldupdate 
INNER JOIN
-                                                       {edge_table}  ON
-                                                       
({edge_table}.{src}=oldupdate.id {checkg_eo})
-                                               GROUP BY {edge_table}.{dest} 
{comma_grp_e}
-                                               ) x
-                                               ON ({edge_table}.{dest} = x.id 
{checkg_ex} )
-                                       WHERE ABS(oldupdate.{weight} + 
{edge_table}.{weight}
-                                                               - x.{weight}) < 
{EPSILON}
-                                       ) message
-                                       ON (message.id = out_table.{vertex_id} 
{checkg_om})
-                               WHERE message.{weight}<out_table.{weight}
-                               """.format(**locals()))
-
-                       plpy.execute(sql)
-
-                       # Swap the update tables for the next iteration.
-                       tmp = oldupdate
-                       oldupdate = newupdate
-                       newupdate = tmp
-
-               plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
-               # The algorithm should converge in less than |V| iterations.
-               # Otherwise there is a negative cycle in the graph.
-               if i == v_cnt:
-                       if grouping_cols is None:
-                               plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
-                                       format(out_table, out_table+"_summary", 
oldupdate))
-                               if is_hawq:
-                                       plpy.execute("DROP TABLE IF EXISTS 
{0}".format(temp_table))
-                               plpy.error("Graph SSSP: Detected a negative 
cycle in the graph.")
-
-                       # It is possible that not all groups has negative 
cycles.
-                       else:
-
-                               # negs is the string created by collating 
grouping columns.
-                               # By looking at the oldupdate table we can see 
which groups
-                               # are in a negative cycle.
-
-                               negs = plpy.execute(
-                                       """ SELECT array_agg(DISTINCT 
({grouping_cols})) AS grp
-                                               FROM {oldupdate}
-                                       """.format(**locals()))[0]['grp']
-
-                               # Delete the groups with negative cycles from 
the output table.
-                               if is_hawq:
-                                       sql_del = """
-                                               TRUNCATE TABLE {temp_table};
-                                               INSERT INTO {temp_table}
-                                                       SELECT *
-                                                       FROM {out_table}
-                                                       WHERE NOT EXISTS(
-                                                               SELECT 1
-                                                               FROM 
{oldupdate} as oldupdate
-                                                               WHERE 
{checkg_oo_sub}
-                                                               );"""
-                                       plpy.execute(sql_del.format(**locals()))
-                                       plpy.execute("DROP TABLE 
{0}".format(out_table))
-                                       plpy.execute("ALTER TABLE {0} RENAME TO 
{1}".
-                                               format(temp_table,out_table))
-                               else:
-                                       sql_del = """ DELETE FROM {out_table}
-                                               USING {oldupdate} AS oldupdate
-                                               WHERE {checkg_oo_sub}"""
-                                       plpy.execute(sql_del.format(**locals()))
-
-                               # If every group has a negative cycle,
-                               # drop the output table as well.
-                               if table_is_empty(out_table):
-                                       plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".
-                                               
format(out_table,out_table+"_summary"))
-
-                               plpy.warning(
-                                       """Graph SSSP: Detected a negative 
cycle in the """ +
-                                       """sub-graphs of following groups: 
{0}.""".
-                                       format(str(negs)[1:-1]))
-
-               plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
-               if is_hawq:
-                       plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
-                               format(**locals()))
-
-       return None
+    with MinWarning("warning"):
+
+        INT_MAX = 2147483647
+        INFINITY = "'Infinity'"
+        EPSILON = 0.000001
+
+        message = unique_string(desp='message')
+
+        oldupdate = unique_string(desp='oldupdate')
+        newupdate = unique_string(desp='newupdate')
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args,
+                                              params_types,
+                                              default_args)
+
+        # Prepare the input for recording in the summary table
+        if vertex_id is None:
+            v_st = "NULL"
+            vertex_id = "id"
+        else:
+            v_st = vertex_id
+        if edge_args is None:
+            e_st = "NULL"
+        else:
+            e_st = edge_args
+        if grouping_cols is None:
+            g_st = "NULL"
+            glist = None
+        else:
+            g_st = grouping_cols
+            glist = split_quoted_delimited_str(grouping_cols)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        distribution = '' if is_platform_pg() else "DISTRIBUTED BY 
({0})".format(vertex_id)
+        local_distribution = '' if is_platform_pg() else "DISTRIBUTED BY (id)"
+
+        is_hawq = is_platform_hawq()
+
+        _validate_sssp(vertex_table, vertex_id, edge_table,
+                       edge_params, source_vertex, out_table, glist)
+
+        plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
+            message, oldupdate, newupdate))
+
+        # Initialize grouping related variables
+        comma_grp = ""
+        comma_grp_e = ""
+        comma_grp_m = ""
+        grp_comma = ""
+        checkg_oo = ""
+        checkg_eo = ""
+        checkg_ex = ""
+        checkg_om = ""
+        group_by = ""
+
+        if grouping_cols is not None:
+            comma_grp = " , " + grouping_cols
+            group_by = " , " + _grp_from_table(edge_table, glist)
+            comma_grp_e = " , " + _grp_from_table(edge_table, glist)
+            comma_grp_m = " , " + _grp_from_table("message", glist)
+            grp_comma = grouping_cols + " , "
+
+            checkg_oo_sub = _check_groups(out_table, "oldupdate", glist)
+            checkg_oo = " AND " + checkg_oo_sub
+            checkg_eo = " AND " + _check_groups(edge_table, "oldupdate", glist)
+            checkg_ex = " AND " + _check_groups(edge_table, "x", glist)
+            checkg_om = " AND " + _check_groups("out_table", "message", glist)
+
+        w_type = get_expr_type(weight, edge_table).lower()
+        init_w = INT_MAX
+        if w_type in ['real', 'double precision', 'float8']:
+            init_w = INFINITY
+
+        # We keep a table of every vertex, the minimum cost to that destination
+        # seen so far and the parent to this vertex in the associated shortest
+        # path. This table will be updated throughout the execution.
+        plpy.execute(
+            """ CREATE TABLE {out_table} AS (SELECT
+                    {grp_comma} {src} AS {vertex_id}, {weight},
+                    {src} AS parent FROM {edge_table} LIMIT 0)
+                {distribution} """.format(**locals()))
+
+        # We keep a summary table to keep track of the parameters used for this
+        # SSSP run. This table is used in the path finding function to 
eliminate
+        # the need for repetition.
+        plpy.execute(""" CREATE TABLE {out_table}_summary  (
+            vertex_table            TEXT,
+            vertex_id               TEXT,
+            edge_table              TEXT,
+            edge_args               TEXT,
+            source_vertex           INTEGER,
+            out_table               TEXT,
+            grouping_cols           TEXT)
+            """.format(**locals()))
+        plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+            ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+            {source_vertex}, '{out_table}', '{g_st}')
+            """.format(**locals()))
+
+        # We keep 2 update tables and alternate them during the execution.
+        # This is necessary since we need to know which vertices are updated in
+        # the previous iteration to calculate the next set of updates.
+        plpy.execute(
+            """ CREATE TEMP TABLE {oldupdate} AS (SELECT
+                    {src} AS id, {weight},
+                    {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+                {local_distribution}
+                """.format(**locals()))
+        plpy.execute(
+            """ CREATE TEMP TABLE {newupdate} AS (SELECT
+                    {src} AS id, {weight},
+                    {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0)
+                {local_distribution}
+                """.format(**locals()))
+
+        # Since HAWQ does not allow us to update, we create a new table and
+        # rename at every iteration.
+        if is_hawq:
+            temp_table = unique_string(desp='temp')
+            sql = """ CREATE TABLE {temp_table} AS (SELECT * FROM {out_table} )
+                {distribution} """
+            plpy.execute(sql.format(**locals()))
+
+        # GPDB and HAWQ have distributed by clauses to help them with indexing.
+        # For Postgres we add the indices manually.
+        if is_platform_pg():
+            plpy.execute("""
+                CREATE INDEX ON {out_table} ({vertex_id});
+                CREATE INDEX ON {oldupdate} (id);
+                CREATE INDEX ON {newupdate} (id);
+            """.format(**locals()))
+
+        # The initialization step is quite different when grouping is involved
+        # since not every group (subgraph) will have the same set of vertices.
+
+        # Example:
+        # Assume there are two grouping columns g1 and g2
+        # g1 values are 0 and 1. g2 values are 5 and 6
+        if grouping_cols is not None:
+
+            distinct_grp_table = unique_string(desp='grp')
+            plpy.execute("DROP TABLE IF EXISTS {distinct_grp_table}".
+                         format(**locals()))
+            plpy.execute("""
+                CREATE TEMP TABLE {distinct_grp_table} AS
+                SELECT DISTINCT {grouping_cols} FROM {edge_table}
+                """.format(**locals()))
+            subq = unique_string(desp='subquery')
+
+            checkg_ds_sub = _check_groups(distinct_grp_table, subq, glist)
+            grp_d_comma = _grp_from_table(distinct_grp_table, glist) + ","
+
+            plpy.execute("""
+                INSERT INTO {out_table}
+                SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM {distinct_grp_table} INNER JOIN
+                    (
+                    SELECT {src} AS {vertex_id} {comma_grp}
+                    FROM {edge_table}
+                    UNION
+                    SELECT {dest} AS {vertex_id} {comma_grp}
+                    FROM {edge_table}
+                    ) {subq} ON ({checkg_ds_sub})
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))
+
+            plpy.execute("""
+                INSERT INTO {oldupdate}
+                SELECT {source_vertex}, 0, {source_vertex},
+                       {grouping_cols}
+                FROM {distinct_grp_table}
+                """.format(**locals()))
+
+            # The maximum number of vertices for any group.
+            # Used for determining negative cycles.
+            v_cnt = plpy.execute("""
+                SELECT max(count) as max FROM (
+                    SELECT count({vertex_id}) AS count
+                    FROM {out_table}
+                    GROUP BY {grouping_cols}) x
+                """.format(**locals()))[0]['max']
+            plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table))
+        else:
+            plpy.execute("""
+                INSERT INTO {out_table}
+                SELECT {vertex_id} AS {vertex_id},
+                    {init_w} AS {weight},
+                    NULL AS parent
+                FROM {vertex_table}
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))
+
+            # The source can be reached with 0 cost and it has itself as the
+            # parent.
+            plpy.execute("""
+                INSERT INTO {oldupdate}
+                VALUES({source_vertex},0,{source_vertex})
+                """.format(**locals()))
+
+            v_cnt = plpy.execute("""
+                SELECT count(*) FROM {vertex_table}
+                WHERE {vertex_id} IS NOT NULL
+                """.format(**locals()))[0]['count']
+
+        for i in range(0, v_cnt + 1):
+
+            # Apply the updates calculated in the last iteration.
+            if is_hawq:
+                sql = """
+                TRUNCATE TABLE {temp_table};
+                INSERT INTO {temp_table}
+                    SELECT *
+                    FROM {out_table}
+                    WHERE NOT EXISTS (
+                        SELECT 1
+                        FROM {oldupdate} as oldupdate
+                        WHERE {out_table}.{vertex_id} = oldupdate.id
+                        {checkg_oo})
+                    UNION
+                    SELECT {grp_comma} id, {weight}, parent FROM {oldupdate};
+                """
+                plpy.execute(sql.format(**locals()))
+                plpy.execute("DROP TABLE {0}".format(out_table))
+                plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                             format(temp_table, out_table))
+                sql = """ CREATE TABLE {temp_table} AS (
+                    SELECT * FROM {out_table} LIMIT 0)
+                    {distribution};"""
+                plpy.execute(sql.format(**locals()))
+                ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
+                                   format(oldupdate))
+            else:
+                sql = """
+                UPDATE {out_table} SET
+                    {weight} = oldupdate.{weight},
+                    parent = oldupdate.parent
+                FROM
+                    {oldupdate} AS oldupdate
+                WHERE
+                    {out_table}.{vertex_id} = oldupdate.id AND
+                    {out_table}.{weight} > oldupdate.{weight} {checkg_oo}
+                """
+                ret = plpy.execute(sql.format(**locals()))
+
+            if ret.nrows() == 0:
+                break
+
+            plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
+
+            # 'oldupdate' table has the update info from the last iteration
+
+            # Consider every edge that has an updated source
+            # From these edges:
+            # For every destination vertex, find the min total cost to reach.
+            # Note that, just calling an aggregate function with group by won't
+            # let us store the src field of the edge (needed for the parent).
+            # This is why we need the 'x'; it gives a list of destinations and
+            # associated min values. Using these values, we identify which edge
+            # is selected.
+
+            # Since using '=' with floats is dangerous we use an epsilon value
+            # for comparison.
+
+            # Once we have a list of edges and values (stores as 'message'),
+            # we check if these values are lower than the existing shortest
+            # path values.
+
+            sql = (""" INSERT INTO {newupdate}
+                SELECT DISTINCT ON (message.id {comma_grp})
+                    message.id AS id,
+                    message.{weight} AS {weight},
+                    message.parent AS parent {comma_grp_m}
+                FROM {out_table} AS out_table INNER JOIN
+                    (
+                    SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
+                        oldupdate.id AS parent {comma_grp_e}
+                    FROM {oldupdate} AS oldupdate INNER JOIN
+                        {edge_table}  ON
+                            ({edge_table}.{src} = oldupdate.id {checkg_eo})
+                        INNER JOIN
+                        (
+                        SELECT {edge_table}.{dest} AS id,
+                            min(oldupdate.{weight} +
+                                {edge_table}.{weight}) AS {weight} 
{comma_grp_e}
+                        FROM {oldupdate} AS oldupdate INNER JOIN
+                            {edge_table}  ON
+                            ({edge_table}.{src}=oldupdate.id {checkg_eo})
+                        GROUP BY {edge_table}.{dest} {comma_grp_e}
+                        ) x
+                        ON ({edge_table}.{dest} = x.id {checkg_ex} )
+                    WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
+                                - x.{weight}) < {EPSILON}
+                    ) message
+                    ON (message.id = out_table.{vertex_id} {checkg_om})
+                WHERE message.{weight}<out_table.{weight}
+                """.format(**locals()))
+
+            plpy.execute(sql)
+
+            # Swap the update tables for the next iteration.
+            tmp = oldupdate
+            oldupdate = newupdate
+            newupdate = tmp
+
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
+        # The algorithm should converge in less than |V| iterations.
+        # Otherwise there is a negative cycle in the graph.
+        if i == v_cnt:
+            if grouping_cols is None:
+                plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
+                             format(out_table, out_table + "_summary", 
oldupdate))
+                if is_hawq:
+                    plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table))
+                plpy.error("Graph SSSP: Detected a negative cycle in the 
graph.")
+
+            # It is possible that not all groups has negative cycles.
+            else:
+
+                # negs is the string created by collating grouping columns.
+                # By looking at the oldupdate table we can see which groups
+                # are in a negative cycle.
+
+                negs = plpy.execute(
+                    """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+                        FROM {oldupdate}
+                    """.format(**locals()))[0]['grp']
+
+                # Delete the groups with negative cycles from the output table.
+                if is_hawq:
+                    sql_del = """
+                        TRUNCATE TABLE {temp_table};
+                        INSERT INTO {temp_table}
+                            SELECT *
+                            FROM {out_table}
+                            WHERE NOT EXISTS(
+                                SELECT 1
+                                FROM {oldupdate} as oldupdate
+                                WHERE {checkg_oo_sub}
+                                );"""
+                    plpy.execute(sql_del.format(**locals()))
+                    plpy.execute("DROP TABLE {0}".format(out_table))
+                    plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                                 format(temp_table, out_table))
+                else:
+                    sql_del = """ DELETE FROM {out_table}
+                        USING {oldupdate} AS oldupdate
+                        WHERE {checkg_oo_sub}"""
+                    plpy.execute(sql_del.format(**locals()))
+
+                # If every group has a negative cycle,
+                # drop the output table as well.
+                if table_is_empty(out_table):
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                                 format(out_table, out_table + "_summary"))
+
+                plpy.warning(
+                    """Graph SSSP: Detected a negative cycle in the """ +
+                    """sub-graphs of following groups: {0}.""".
+                    format(str(negs)[1:-1]))
+
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
+        if is_hawq:
+            plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
+                         format(**locals()))
+    return None
+
 
 def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
-       **kwargs):
-       """
+                        **kwargs):
+    """
     Helper function that can be used to get the shortest path for a vertex
     Args:
         @param sssp_table   Name of the table that contains the SSSP output.
@@ -447,188 +447,187 @@ def graph_sssp_get_path(schema_madlib, sssp_table, 
dest_vertex, path_table,
                             desired path.
         @param path_table   Name of the output table that contains the path.
 
-       """
-       with MinWarning("warning"):
-               _validate_get_path(sssp_table, dest_vertex, path_table)
-
-               temp1_name = unique_string(desp='temp1')
-               temp2_name = unique_string(desp='temp2')
-
-               select_grps = ""
-               check_grps_t1 = ""
-               check_grps_t2 = ""
-               grp_comma = ""
-               tmp = ""
-
-               summary = plpy.execute("SELECT * FROM 
{0}_summary".format(sssp_table))
-               vertex_id = summary[0]['vertex_id']
-               source_vertex = summary[0]['source_vertex']
-
-               if vertex_id == "NULL":
-                       vertex_id = "id"
-
-               grouping_cols = summary[0]['grouping_cols']
-               if grouping_cols == "NULL":
-                       grouping_cols = None
-
-               if grouping_cols is not None:
-                       glist = split_quoted_delimited_str(grouping_cols)
-                       select_grps = _grp_from_table(sssp_table,glist) + " , "
-                       check_grps_t1 = " AND " + _check_groups(
-                               sssp_table,temp1_name,glist)
-                       check_grps_t2 = " AND " + _check_groups(
-                               sssp_table,temp2_name,glist)
-
-                       grp_comma = grouping_cols + " , "
-
-               if source_vertex == dest_vertex:
-                       plpy.execute("""
-                               CREATE TABLE {path_table} AS
-                               SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] 
AS path
-                               FROM {sssp_table} WHERE {vertex_id} = 
{dest_vertex}
-                               """.format(**locals()))
-                       return
-
-               plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
-                       format(temp1_name,temp2_name));
-               out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
-                               SELECT {grp_comma} {sssp_table}.parent AS 
{vertex_id},
-                                       ARRAY[{dest_vertex}] AS path
-                               FROM {sssp_table}
-                               WHERE {vertex_id} = {dest_vertex}
-                                       AND {sssp_table}.parent IS NOT NULL
-                       """.format(**locals()))
-
-               plpy.execute("""
-                       CREATE TEMP TABLE {temp2_name} AS
-                               SELECT * FROM {temp1_name} LIMIT 0
-                       """.format(**locals()))
-
-               # Follow the 'parent' chain until you reach the source.
-               while out.nrows() > 0:
-
-                       plpy.execute("TRUNCATE TABLE 
{temp2_name}".format(**locals()))
-                       # If the vertex id is not the source vertex,
-                       # Add it to the path and move to its parent
-                       out = plpy.execute(
-                               """ INSERT INTO {temp2_name}
-                               SELECT {select_grps} {sssp_table}.parent AS 
{vertex_id},
-                                       {sssp_table}.{vertex_id} || 
{temp1_name}.path AS path
-                               FROM {sssp_table} INNER JOIN {temp1_name} ON
-                                       ({sssp_table}.{vertex_id} = 
{temp1_name}.{vertex_id}
-                                               {check_grps_t1})
-                               WHERE {source_vertex} <> 
{sssp_table}.{vertex_id}
-                               """.format(**locals()))
-
-                       tmp = temp2_name
-                       temp2_name = temp1_name
-                       temp1_name = tmp
-
-                       tmp = check_grps_t1
-                       check_grps_t1 = check_grps_t2
-                       check_grps_t2 = tmp
-
-               # Add the source vertex to the beginning of every path and
-               # add the empty arrays for the groups that don't have a path to 
reach
-               # the destination vertex
-               plpy.execute("""
-                       CREATE TABLE {path_table} AS
-                       SELECT {grp_comma} {source_vertex} || path AS path
-                       FROM {temp2_name}
-                       UNION
-                       SELECT {grp_comma} '{{}}'::INT[] AS path
-                       FROM {sssp_table}
-                       WHERE {vertex_id} = {dest_vertex}
-                               AND {sssp_table}.parent IS NULL
-                       """.format(**locals()))
-
-               out = plpy.execute("SELECT 1 FROM {0} LIMIT 
1".format(path_table))
-
-               if out.nrows() == 0:
-                       plpy.error(
-                               "Graph SSSP: Vertex {0} is not present in the 
SSSP table {1}".
-                               format(dest_vertex,sssp_table))
-
-               plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
-                       format(**locals()))
-
-       return None
+    """
+    with MinWarning("warning"):
+        _validate_get_path(sssp_table, dest_vertex, path_table)
+
+        temp1_name = unique_string(desp='temp1')
+        temp2_name = unique_string(desp='temp2')
+
+        select_grps = ""
+        check_grps_t1 = ""
+        check_grps_t2 = ""
+        grp_comma = ""
+        tmp = ""
+
+        summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
+        vertex_id = summary[0]['vertex_id']
+        source_vertex = summary[0]['source_vertex']
+
+        if vertex_id == "NULL":
+            vertex_id = "id"
+
+        grouping_cols = summary[0]['grouping_cols']
+        if grouping_cols == "NULL":
+            grouping_cols = None
+
+        if grouping_cols is not None:
+            glist = split_quoted_delimited_str(grouping_cols)
+            select_grps = _grp_from_table(sssp_table, glist) + " , "
+            check_grps_t1 = " AND " + _check_groups(
+                sssp_table, temp1_name, glist)
+            check_grps_t2 = " AND " + _check_groups(
+                sssp_table, temp2_name, glist)
+
+            grp_comma = grouping_cols + " , "
+
+        if source_vertex == dest_vertex:
+            plpy.execute("""
+                CREATE TABLE {path_table} AS
+                SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+                FROM {sssp_table} WHERE {vertex_id} = {dest_vertex}
+                """.format(**locals()))
+            return
+
+        plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                     format(temp1_name, temp2_name))
+        out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+                SELECT {grp_comma} {sssp_table}.parent AS {vertex_id},
+                    ARRAY[{dest_vertex}] AS path
+                FROM {sssp_table}
+                WHERE {vertex_id} = {dest_vertex}
+                    AND {sssp_table}.parent IS NOT NULL
+            """.format(**locals()))
+
+        plpy.execute("""
+            CREATE TEMP TABLE {temp2_name} AS
+                SELECT * FROM {temp1_name} LIMIT 0
+            """.format(**locals()))
+
+        # Follow the 'parent' chain until you reach the source.
+        while out.nrows() > 0:
+
+            plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+            # If the vertex id is not the source vertex,
+            # Add it to the path and move to its parent
+            out = plpy.execute(
+                """ INSERT INTO {temp2_name}
+                SELECT {select_grps} {sssp_table}.parent AS {vertex_id},
+                    {sssp_table}.{vertex_id} || {temp1_name}.path AS path
+                FROM {sssp_table} INNER JOIN {temp1_name} ON
+                    ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id}
+                        {check_grps_t1})
+                WHERE {source_vertex} <> {sssp_table}.{vertex_id}
+                """.format(**locals()))
+
+            tmp = temp2_name
+            temp2_name = temp1_name
+            temp1_name = tmp
+
+            tmp = check_grps_t1
+            check_grps_t1 = check_grps_t2
+            check_grps_t2 = tmp
+
+        # Add the source vertex to the beginning of every path and
+        # add the empty arrays for the groups that don't have a path to reach
+        # the destination vertex
+        plpy.execute("""
+            CREATE TABLE {path_table} AS
+            SELECT {grp_comma} {source_vertex} || path AS path
+            FROM {temp2_name}
+            UNION
+            SELECT {grp_comma} '{{}}'::INT[] AS path
+            FROM {sssp_table}
+            WHERE {vertex_id} = {dest_vertex}
+                AND {sssp_table}.parent IS NULL
+            """.format(**locals()))
+
+        out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+        if out.nrows() == 0:
+            plpy.error(
+                "Graph SSSP: Vertex {0} is not present in the SSSP table {1}".
+                format(dest_vertex, sssp_table))
+
+        plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+                     format(**locals()))
+    return None
 
 
 def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
-       source_vertex, out_table, glist, **kwargs):
+                   source_vertex, out_table, glist, **kwargs):
 
-       validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-               out_table,'SSSP')
+    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+                          out_table, 'SSSP')
 
-       _assert(isinstance(source_vertex,int),
-               """Graph SSSP: Source vertex {source_vertex} has to be an 
integer.""".
-               format(**locals()))
-       src_exists = plpy.execute("""
-               SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
-               """.format(**locals()))
+    _assert(isinstance(source_vertex, int),
+            """Graph SSSP: Source vertex {source_vertex} has to be an 
integer.""".
+            format(**locals()))
+    src_exists = plpy.execute("""
+        SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
+        """.format(**locals()))
 
-       if src_exists.nrows() == 0:
-               plpy.error(
-                       """Graph SSSP: Source vertex {source_vertex} is not 
present in the vertex table {vertex_table}.""".
-                       format(**locals()))
+    if src_exists.nrows() == 0:
+        plpy.error("Graph SSSP: Source vertex {source_vertex} is not present "
+                   "in the vertex table {vertex_table}.".format(**locals()))
 
-       vt_error = plpy.execute(
-               """ SELECT {vertex_id}
-                       FROM {vertex_table}
-                       WHERE {vertex_id} IS NOT NULL
-                       GROUP BY {vertex_id}
-                       HAVING count(*) > 1 """.format(**locals()))
+    vt_error = plpy.execute("""
+        SELECT {vertex_id}
+        FROM {vertex_table}
+        WHERE {vertex_id} IS NOT NULL
+        GROUP BY {vertex_id}
+        HAVING count(*) > 1
+        """.format(**locals()))
 
-       if vt_error.nrows() != 0:
-               plpy.error(
-                       """Graph SSSP: Source vertex table {vertex_table} 
contains duplicate vertex id's.""".
-                       format(**locals()))
+    if vt_error.nrows() != 0:
+        plpy.error("Graph SSSP: Source vertex table {vertex_table} "
+                   "contains duplicate vertex id's.".format(**locals()))
 
-       _assert(not table_exists(out_table+"_summary"),
-               "Graph SSSP: Output summary table already exists!")
+    _assert(not table_exists(out_table + "_summary"),
+            "Graph SSSP: Output summary table already exists!")
 
-       if glist is not None:
-               _assert(columns_exist_in_table(edge_table, glist),
-                       """Graph SSSP: Not all columns from {glist} are present 
in edge table ({edge_table}).""".
-                       format(**locals()))
+    if glist is not None:
+        _assert(columns_exist_in_table(edge_table, glist),
+                "Graph SSSP: Not all columns from {glist} are present in "
+                "edge table ({edge_table}).".format(**locals()))
+    return None
 
-       return None
 
 def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
 
-       _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
-               "Graph SSSP: Invalid SSSP table name!")
-       _assert(table_exists(sssp_table),
-               "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
-       _assert(not table_is_empty(sssp_table),
-               "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
+    _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
+            "Graph SSSP: Invalid SSSP table name!")
+    _assert(table_exists(sssp_table),
+            "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table))
+    _assert(not table_is_empty(sssp_table),
+            "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
 
-       summary = sssp_table+"_summary"
-       _assert(table_exists(summary),
-               "Graph SSSP: SSSP summary table ({0}) is 
missing!".format(summary))
-       _assert(not table_is_empty(summary),
-               "Graph SSSP: SSSP summary table ({0}) is 
empty!".format(summary))
+    summary = sssp_table + "_summary"
+    _assert(table_exists(summary),
+            "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary))
+    _assert(not table_is_empty(summary),
+            "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary))
 
-       _assert(not table_exists(path_table),
-               "Graph SSSP: Output path table already exists!")
+    _assert(not table_exists(path_table),
+            "Graph SSSP: Output path table already exists!")
+
+    return None
 
-       return None
 
 def graph_sssp_help(schema_madlib, message, **kwargs):
-       """
-       Help function for graph_sssp and graph_sssp_get_path
-
-       Args:
-               @param schema_madlib
-               @param message: string, Help message string
-               @param kwargs
-
-       Returns:
-           String. Help/usage information
-       """
-       if not message:
-               help_string = """
+    """
+    Help function for graph_sssp and graph_sssp_get_path
+
+    Args:
+        @param schema_madlib
+        @param message: string, Help message string
+        @param kwargs
+
+    Returns:
+        String. Help/usage information
+    """
+    if not message:
+        help_string = """
 -----------------------------------------------------------------------
                             SUMMARY
 -----------------------------------------------------------------------
@@ -640,8 +639,8 @@ weights of its constituent edges is minimized.
 For more details on function usage:
     SELECT {schema_madlib}.graph_sssp('usage')
             """
-       elif message.lower() in ['usage', 'help', '?']:
-               help_string = """
+    elif message.lower() in ['usage', 'help', '?']:
+        help_string = """
 Given a graph and a source vertex, single source shortest path (SSSP)
 algorithm finds a path for every vertex such that the sum of the
 weights of its constituent edges is minimized.
@@ -651,8 +650,8 @@ weights of its constituent edges is minimized.
 To retrieve the path for a specific vertex:
 
  SELECT {schema_madlib}.graph_sssp_get_path(
-    sssp_table TEXT, -- Name of the table that contains the SSSP output.
-    dest_vertex        INT,  -- The vertex that will be the destination of the
+    sssp_table  TEXT, -- Name of the table that contains the SSSP output.
+    dest_vertex INT,  -- The vertex that will be the destination of the
                       -- desired path.
     path_table  TEXT  -- Name of the output table that contains the path.
 );
@@ -679,8 +678,8 @@ every group and has the following columns:
   - path (ARRAY)  : The shortest path from the source vertex (as specified
                   in the SSSP execution) to the destination vertex.
 """
-       elif message.lower() in ("example", "examples"):
-               help_string = """
+    elif message.lower() in ("example", "examples"):
+        help_string = """
 ----------------------------------------------------------------------------
                                 EXAMPLES
 ----------------------------------------------------------------------------
@@ -723,12 +722,12 @@ INSERT INTO edge VALUES
 -- Compute the SSSP:
 DROP TABLE IF EXISTS out;
 SELECT madlib.graph_sssp(
-       'vertex',                            -- Vertex table
-       'id',                                -- Vertix id column
-       'edge',                              -- Edge table
-       'src=src, dest=dest, weight=weight', -- Comma delimted string of edge 
arguments
-        0,                                  -- The source vertex
-       'out'                                -- Output table of SSSP
+    'vertex',                            -- Vertex table
+    'id',                                -- Vertix id column
+    'edge',                              -- Edge table
+    'src=src, dest=dest, weight=weight', -- Comma delimted string of edge 
arguments
+     0,                                  -- The source vertex
+    'out'                                -- Output table of SSSP
 );
 -- View the SSSP costs for every vertex:
 SELECT * FROM out ORDER BY id;
@@ -752,12 +751,14 @@ INSERT INTO edge_gr VALUES
 DROP TABLE IF EXISTS out_gr, out_gr_summary;
 SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
 """
-       else:
-               help_string = "No such option. Use {schema_madlib}.graph_sssp()"
-
-       return help_string.format(schema_madlib=schema_madlib,
-               graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
-    """source_vertex INT,  -- The source vertex id for the algorithm to start.
-    out_table     TEXT, -- Name of the table to store the result of SSSP.
-    grouping_cols TEXT  -- The list of grouping columns."""))
+    else:
+        help_string = "No such option. Use {schema_madlib}.graph_sssp()"
+
+    common_usage_string = get_graph_usage(
+        schema_madlib, 'graph_sssp',
+        """source_vertex INT,  -- The source vertex id for the algorithm to 
start.
+            out_table     TEXT, -- Name of the table to store the result of 
SSSP.
+            grouping_cols TEXT  -- The list of grouping columns.""")
+    return help_string.format(schema_madlib=schema_madlib,
+                              graph_usage=common_usage_string)
 # ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/wcc.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/wcc.py_in 
b/src/ports/postgres/modules/graph/wcc.py_in
index 02cceeb..1f6a81f 100644
--- a/src/ports/postgres/modules/graph/wcc.py_in
+++ b/src/ports/postgres/modules/graph/wcc.py_in
@@ -31,34 +31,37 @@ import plpy
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string, split_quoted_delimited_str
-from utilities.validate_args import columns_exist_in_table, get_cols_and_types
-from graph_utils import *
+from utilities.validate_args import columns_exist_in_table
+from utilities.utilities import is_platform_pg, is_platform_hawq
+from graph_utils import validate_graph_coding, get_graph_usage
 
-m4_changequote(`<!', `!>')
 
 def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, grouping_cols_list, module_name):
+                      edge_params, out_table, grouping_cols_list, module_name):
     """
     Function to validate input parameters for wcc
     """
     validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-        out_table, module_name)
+                          out_table, module_name)
     if grouping_cols_list:
         # validate the grouping columns. We currently only support 
grouping_cols
         # to be column names in the edge_table, and not expressions!
         _assert(columns_exist_in_table(edge_table, grouping_cols_list, 
schema_madlib),
-                "Weakly Connected Components error: One or more grouping 
columns specified do not exist!")
+                "Weakly Connected Components error: "
+                "One or more grouping columns specified do not exist!")
 
 
 def prefix_tablename_to_colnames(table, cols_list):
     return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list])
 
+
 def get_where_condition(table1, table2, cols_list):
     return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col)
-            for col in cols_list])
+                        for col in cols_list])
+
 
 def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
-    out_table, grouping_cols, **kwargs):
+        out_table, grouping_cols, **kwargs):
     """
     Function that computes the wcc
 
@@ -78,8 +81,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
     plpy.execute('SET client_min_messages TO warning')
     params_types = {'src': str, 'dest': str}
     default_args = {'src': 'src', 'dest': 'dest'}
-    edge_params = extract_keyvalue_params(edge_args,
-            params_types, default_args)
+    edge_params = extract_keyvalue_params(edge_args, params_types, 
default_args)
 
     # populate default values for optional params if null
     if vertex_id is None:
@@ -89,7 +91,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
 
     grouping_cols_list = split_quoted_delimited_str(grouping_cols)
     validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table,
-        edge_params, out_table, grouping_cols_list, 'Weakly Connected 
Components')
+                      edge_params, out_table, grouping_cols_list,
+                      'Weakly Connected Components')
     src = edge_params["src"]
     dest = edge_params["dest"]
 
@@ -99,21 +102,22 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
     toupdate = unique_string(desp='toupdate')
     temp_out_table = unique_string(desp='tempout')
 
-    distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+    distribution = '' if is_platform_pg() else "DISTRIBUTED BY 
({0})".format(vertex_id)
     subq_prefixed_grouping_cols = ''
     comma_toupdate_prefixed_grouping_cols = ''
     comma_oldupdate_prefixed_grouping_cols = ''
     old_new_update_where_condition = ''
     new_to_update_where_condition = ''
     edge_to_update_where_condition = ''
-    is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+    is_hawq = is_platform_hawq()
 
     INT_MAX = 2147483647
     component_id = 'component_id'
+    grouping_cols_comma = '' if not grouping_cols else grouping_cols + ','
+
     if grouping_cols:
-        distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-        <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>)
+        distribution = ('' if is_platform_pg() else
+                        "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, 
vertex_id))
         # Update some variables useful for grouping based query strings
         subq = unique_string(desp='subquery')
         distinct_grp_table = unique_string(desp='grptable')
@@ -121,18 +125,20 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                 CREATE TABLE {distinct_grp_table} AS
                 SELECT DISTINCT {grouping_cols} FROM {edge_table}
             """.format(**locals()))
-        comma_toupdate_prefixed_grouping_cols = ', ' + 
prefix_tablename_to_colnames(toupdate,
-            grouping_cols_list)
-        comma_oldupdate_prefixed_grouping_cols = ', ' + 
prefix_tablename_to_colnames(
-            oldupdate, grouping_cols_list)
-        subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq,
-            grouping_cols_list)
-        old_new_update_where_condition = ' AND ' + get_where_condition(
-            oldupdate, newupdate, grouping_cols_list)
-        new_to_update_where_condition = ' AND ' + get_where_condition(
-            newupdate, toupdate, grouping_cols_list)
-        edge_to_update_where_condition = ' AND ' + get_where_condition(
-            edge_table, toupdate, grouping_cols_list)
+
+        pttc = prefix_tablename_to_colnames
+        gwc = get_where_condition
+
+        comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, 
grouping_cols_list)
+        comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, 
grouping_cols_list)
+        subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list)
+        old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, 
grouping_cols_list)
+        new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, 
grouping_cols_list)
+        edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, 
grouping_cols_list)
+        join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list)
+        group_by_clause = ('' if not grouping_cols else
+                           '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols,
+                                                 subq, vertex_id))
         plpy.execute("""
                 CREATE TABLE {newupdate} AS
                 SELECT {subq}.{vertex_id},
@@ -148,13 +154,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
                 ON {join_grouping_cols}
                 GROUP BY {group_by_clause}
                 {distribution}
-            """.format(select_grouping_cols=','+subq_prefixed_grouping_cols,
-                join_grouping_cols=get_where_condition(subq,
-                    distinct_grp_table, grouping_cols_list),
-                group_by_clause='' if not grouping_cols else
-                    subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, 
vertex_id),
-                select_grouping_cols_clause='' if not grouping_cols else
-                    grouping_cols+', ', **locals()))
+            """.format(select_grouping_cols=',' + subq_prefixed_grouping_cols,
+                       select_grouping_cols_clause=grouping_cols_comma,
+                       **locals()))
         plpy.execute("""
                 CREATE TEMP TABLE {message} AS
                 SELECT {vertex_id},
@@ -162,8 +164,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
                         {select_grouping_cols_clause}
                 FROM {newupdate}
                 {distribution}
-            """.format(select_grouping_cols_clause='' if not grouping_cols else
-                    ', '+grouping_cols, **locals()))
+            """.format(select_grouping_cols_clause=grouping_cols_comma,
+                       **locals()))
     else:
         plpy.execute("""
                 CREATE TABLE {newupdate} AS
@@ -186,13 +188,14 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             """.format(**locals()))
     nodes_to_update = 1
     while nodes_to_update > 0:
-        # This idea here is simple. Look at all the neighbors of a node, and
-        # assign the smallest node id among the neighbors as its component_id.
-        # The next table starts off with very high component_id (INT_MAX). The
-        # component_id of all nodes which obtain a smaller component_id after
-        # looking at its neighbors are updated in the next table. At every
-        # iteration update only those nodes whose component_id in the previous
-        # iteration are greater than what was found in the current iteration.
+        # Look at all the neighbors of a node, and assign the smallest node id
+        # among the neighbors as its component_id. The next table starts off
+        # with very high component_id (INT_MAX). The component_id of all nodes
+        # which obtain a smaller component_id after looking at its neighbors 
are
+        # updated in the next table. At every iteration update only those nodes
+        # whose component_id in the previous iteration are greater than what 
was
+        # found in the current iteration.
+
         plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
         plpy.execute("""
             CREATE TEMP TABLE {oldupdate} AS
@@ -202,10 +205,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, 
edge_table, edge_args,
             FROM {message}
             GROUP BY {group_by_clause} {vertex_id}
             {distribution}
-        """.format(grouping_cols_select='' if not grouping_cols else
-                ', {0}'.format(grouping_cols), group_by_clause=''
-                if not grouping_cols else '{0}, '.format(grouping_cols),
-                **locals()))
+        """.format(grouping_cols_select='' if not grouping_cols else ', 
{0}'.format(grouping_cols),
+                   group_by_clause=grouping_cols_comma,
+                   **locals()))
 
         plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate))
         plpy.execute("""
@@ -236,8 +238,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
                         SELECT * FROM {toupdate};
                 """.format(**locals()))
             plpy.execute("DROP TABLE {0}".format(newupdate))
-            plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table,
-                            newupdate))
+            plpy.execute("ALTER TABLE {0} RENAME TO {1}".
+                         format(temp_out_table, newupdate))
             plpy.execute("""
                     CREATE TABLE {temp_out_table} AS
                     SELECT * FROM {newupdate}
@@ -275,9 +277,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
             ) AS t
             GROUP BY {group_by_clause} {vertex_id}
         """.format(select_grouping_cols='' if not grouping_cols
-                else ', {0}'.format(grouping_cols), group_by_clause=''
-                if not grouping_cols else ' {0}, '.format(grouping_cols),
-                **locals()))
+                   else ', {0}'.format(grouping_cols), group_by_clause=''
+                   if not grouping_cols else ' {0}, '.format(grouping_cols),
+                   **locals()))
 
         plpy.execute("DROP TABLE {0}".format(oldupdate))
         if grouping_cols:
@@ -300,6 +302,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, 
edge_args,
     if is_hawq:
         plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table))
 
+
 def wcc_help(schema_madlib, message, **kwargs):
     """
     Help function for wcc
@@ -315,11 +318,13 @@ def wcc_help(schema_madlib, message, **kwargs):
     if message is not None and \
             message.lower() in ("usage", "help", "?"):
         help_string = "Get from method below"
-        help_string = get_graph_usage(schema_madlib, 'Weakly Connected 
Components',
+        help_string = get_graph_usage(
+            schema_madlib,
+            'Weakly Connected Components',
             """out_table     TEXT, -- Output table of weakly connected 
components
-    grouping_col  TEXT -- Comma separated column names to group on
-                       -- (DEFAULT = NULL, no grouping)
-""")
+            grouping_col  TEXT -- Comma separated column names to group on
+                               -- (DEFAULT = NULL, no grouping)
+            """)
     else:
         if message is not None and \
                 message.lower() in ("example", "examples"):

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/utilities/utilities.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in 
b/src/ports/postgres/modules/utilities/utilities.py_in
index 6a5e8f9..b28a5f3 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -14,32 +14,43 @@ if __name__ != "__main__":
 m4_changequote(`<!', `!>')
 
 
+def has_function_properties():
+    """ __HAS_FUNCTION_PROPERTIES__ variable defined during configure """
+    return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)
+
+
+def is_platform_pg():
+    """ __POSTGRESQL__ variable defined during configure """
+    return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
+# 
------------------------------------------------------------------------------
+
+
+def is_platform_hawq():
+    """ __HAWQ__ variable defined during configure """
+    return m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+# 
------------------------------------------------------------------------------
+
+
 def get_seg_number():
     """ Find out how many primary segments exist in the distribution
         Might be useful for partitioning data.
     """
-    m4_ifdef(<!__POSTGRESQL__!>, <!return 1!>, <!
-    return plpy.execute(
-        """
-        SELECT count(*) from gp_segment_configuration
-        WHERE role = 'p'
-        """)[0]['count']
-    !>)
+    if is_platform_pg():
+        return 1
+    else:
+        return plpy.execute("""
+            SELECT count(*) from gp_segment_configuration
+            WHERE role = 'p'
+            """)[0]['count']
 # 
------------------------------------------------------------------------------
 
 
 def is_orca():
-    m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!
-    optimizer = plpy.execute("show optimizer")[0]["optimizer"]
-    return True if optimizer == 'on' else False
-    !>, <!
+    if has_function_properties():
+        optimizer = plpy.execute("show optimizer")[0]["optimizer"]
+        if optimizer == 'on':
+            return True
     return False
-    !>)
-# 
------------------------------------------------------------------------------
-
-
-def is_platform_pg():
-    return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>)
 # 
------------------------------------------------------------------------------
 
 

Reply via email to