Graph: Update Python code to follow PEP-8

- Changed indentation to use spaces instead of tabs
- Updated to PEP-8 guidelines
- Updated to follow style guide convention
- Refactored few functions to clean code and design

Closes #148


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/d487df3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/d487df3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/d487df3c

Branch: refs/heads/master
Commit: d487df3c46f984fad6c15b8a1d3e50dc30f6b6f6
Parents: 8c9b955
Author: Rahul Iyer <ri...@apache.org>
Authored: Fri Jul 7 22:23:18 2017 -0700
Committer: Rahul Iyer <ri...@apache.org>
Committed: Thu Jul 20 14:23:22 2017 -0700

----------------------------------------------------------------------
 src/ports/postgres/modules/graph/apsp.py_in     | 1294 +++++++++---------
 .../postgres/modules/graph/graph_utils.py_in    |  170 +--
 src/ports/postgres/modules/graph/pagerank.py_in |  916 +++++++------
 src/ports/postgres/modules/graph/sssp.py_in     | 1125 +++++++--------
 src/ports/postgres/modules/graph/wcc.py_in      |  119 +-
 .../postgres/modules/utilities/utilities.py_in  |   45 +-
 6 files changed, 1842 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/apsp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/apsp.py_in 
b/src/ports/postgres/modules/graph/apsp.py_in
index 1331301..ab8a566 100644
--- a/src/ports/postgres/modules/graph/apsp.py_in
+++ b/src/ports/postgres/modules/graph/apsp.py_in
@@ -38,17 +38,16 @@ from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
 from utilities.utilities import split_quoted_delimited_str
-from utilities.validate_args import get_cols
+from utilities.utilities import is_platform_pg, is_platform_hawq
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 from utilities.validate_args import get_expr_type
 
-m4_changequote(`<!', `!>')
 
 def graph_apsp(schema_madlib, vertex_table, vertex_id, edge_table,
-               edge_args, out_table, grouping_cols, **kwargs):
-       """
+               edge_args, out_table, grouping_cols, **kwargs):
+    """
     All Pairs shortest path function for graphs using the matrix
     multiplication based algorithm [1].
     [1] 
http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml
@@ -57,655 +56,656 @@ def graph_apsp(schema_madlib, vertex_table, vertex_id, 
edge_table,
         @param vertex_id        Name of the column containing the vertex ids.
         @param edge_table       Name of the table that contains the edge data.
         @param edge_args        A comma-delimited string containing multiple
-                                                       named arguments of the 
form "name=value".
-        @param out_table           Name of the table to store the result of 
APSP.
+                                named arguments of the form "name=value".
+        @param out_table        Name of the table to store the result of APSP.
         @param grouping_cols   The list of grouping columns.
 
     """
+    with MinWarning("warning"):
+
+        INT_MAX = 2147483647
+        INFINITY = "'Infinity'"
+        EPSILON = 0.000001
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args, params_types, 
default_args)
+
+        # Prepare the input for recording in the summary table
+        if vertex_id is None:
+            v_st = "NULL"
+            vertex_id = "id"
+        else:
+            v_st = vertex_id
+        if edge_args is None:
+            e_st = "NULL"
+        else:
+            e_st = edge_args
+        if grouping_cols is None:
+            g_st = "NULL"
+            glist = None
+        else:
+            g_st = grouping_cols
+            glist = split_quoted_delimited_str(grouping_cols)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        distribution = '' if is_platform_pg() else "DISTRIBUTED BY 
({0})".format(src)
+        is_hawq = is_platform_hawq()
+
+        _validate_apsp(vertex_table, vertex_id, edge_table,
+                       edge_params, out_table, glist)
+
+        out_table_1 = unique_string(desp='out_table_1')
+        out_table_2 = unique_string(desp='out_table_2')
+        tmp_view = unique_string(desp='tmp_view')
+        v1 = unique_string(desp='v1')
+        v2 = unique_string(desp='v2')
+        message = unique_string(desp='message')
+
+        # Initialize grouping related variables
+        comma_grp = ""
+        comma_grp_e = ""
+        comma_grp_m = ""
+        grp_comma = ""
+        grp_v1_comma = ""
+        grp_o1_comma = ""
+        grp_o_comma = ""
+        checkg_eo = ""
+        checkg_eout = ""
+        checkg_ex = ""
+        checkg_om = ""
+        checkg_o1t_sub = ""
+        checkg_ot_sub = ""
+        checkg_ot = ""
+        checkg_o1t = ""
+        checkg_vv = ""
+        checkg_o2v = ""
+        checkg_oy = ""
+        checkg_vv_sub = "TRUE"
+        grp_by = ""
+
+        if grouping_cols is not None:
+
+            # We use actual table names in some cases and aliases in others
+            # In some cases, we swap the table names so use of an alias is
+            # necessary. In other cases, they are used to simplify debugging.
+
+            comma_grp = " , " + grouping_cols
+            comma_grp_e = " , " + _grp_from_table("edge", glist)
+            comma_grp_m = " , " + _grp_from_table(message, glist)
+            grp_comma = grouping_cols + " , "
+            grp_v1_comma = _grp_from_table("v1", glist) + " , "
+            grp_o1_comma = _grp_from_table(out_table_1, glist) + " , "
+            grp_o_comma = _grp_from_table("out", glist) + " , "
+
+            checkg_eo = " AND " + _check_groups(edge_table, out_table, glist)
+            checkg_eout = " AND " + _check_groups("edge", "out", glist)
+            checkg_ex = " AND " + _check_groups("edge", "x", glist)
+            checkg_om = " AND " + _check_groups(out_table, message, glist)
+            checkg_o1t_sub = _check_groups("out", tmp_view, glist)
+            checkg_ot_sub = _check_groups(out_table, tmp_view, glist)
+            checkg_ot = " AND " + _check_groups(out_table, tmp_view, glist)
+            checkg_o1t = " AND " + _check_groups("out", "t", glist)
+            checkg_vv = " AND " + _check_groups("v1", "v2", glist)
+            checkg_o2v = " AND " + _check_groups(out_table_2, "v2", glist)
+            checkg_oy = " AND " + _check_groups("out", "y", glist)
+            checkg_vv_sub = _check_groups("v1", "v2", glist)
+            grp_by = " GROUP BY " + grouping_cols
+
+        w_type = get_expr_type(weight, edge_table).lower()
+        init_w = INT_MAX
+        if w_type in ['real', 'double precision', 'float8']:
+            init_w = INFINITY
+
+        # We keep a summary table to keep track of the parameters used for this
+        # APSP run. This table is used in the path finding function to 
eliminate
+        # the need for repetition.
+        plpy.execute(""" CREATE TABLE {out_table}_summary  (
+            vertex_table            TEXT,
+            vertex_id               TEXT,
+            edge_table              TEXT,
+            edge_args               TEXT,
+            out_table               TEXT,
+            grouping_cols           TEXT)
+            """.format(**locals()))
+        plpy.execute(""" INSERT INTO {out_table}_summary VALUES
+            ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+            '{out_table}', '{g_st}') """.format(**locals()))
+
+        plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+        # Find all of the vertices involved with a given group
+        plpy.execute(""" CREATE VIEW {tmp_view} AS
+            SELECT {src} AS {vertex_id} {comma_grp}
+            FROM {edge_table} WHERE {src} IS NOT NULL
+            UNION
+            SELECT {dest} AS {vertex_id} {comma_grp}
+            FROM {edge_table} WHERE {dest} IS NOT NULL
+            """.format(**locals()))
+
+        # Don't use the unnecessary rows of the output table during joins.
+        ot_sql = """ SELECT * FROM {out_table}
+            WHERE {weight} != {init_w} AND {src} != {dest} """
+
+        # HAWQ does not support UPDATE so the initialization has to differ.
+        if is_hawq:
+
+            plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
+                out_table_1, out_table_2))
+            # Create 2 identical tables to swap at every iteration.
+            plpy.execute(""" CREATE TABLE {out_table_1} AS
+                SELECT {grp_comma} {src},{dest},{weight}, NULL::INT AS parent
+                FROM {edge_table} LIMIT 0 {distribution}
+                """.format(**locals()))
+            plpy.execute(""" CREATE TABLE {out_table_2} AS
+                SELECT * FROM {out_table_1} LIMIT 0 {distribution}
+                """.format(**locals()))
+
+            # The source can be reached with 0 cost and next is itself.
+            plpy.execute(""" INSERT INTO {out_table_2}
+                SELECT {grp_comma} {vertex_id} AS {src}, {vertex_id} AS {dest},
+                    0 AS {weight}, {vertex_id} AS parent
+                FROM {tmp_view} """.format(**locals()))
+            # Distance = 1: every edge means there is a path from src to dest
+            plpy.execute(""" INSERT INTO {out_table_2}
+                SELECT {grp_comma} {src}, {dest}, {weight}, {dest} AS parent
+                FROM {edge_table} """.format(**locals()))
+
+            # Fill the rest of the possible pairs with infinite initial weights
+            fill_sql = """ INSERT INTO {out_table_1}
+                SELECT {grp_v1_comma}
+                    v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM {tmp_view} v1, {tmp_view} v2
+                WHERE NOT EXISTS
+                    (SELECT 1 FROM {out_table_2}
+                        WHERE v1.{vertex_id} = {src} AND
+                            v2.{vertex_id} = {dest}
+                            {checkg_vv} {checkg_o2v})
+                    {checkg_vv}
+                UNION
+                SELECT * FROM {out_table_2}
+                """.format(**locals())
+            plpy.execute(fill_sql)
+
+            ot_sql1 = ot_sql.format(out_table=out_table_1, init_w=init_w,
+                                    weight=weight, src=src, dest=dest)
+            ot_sql2 = ot_sql.format(out_table=out_table_2, init_w=init_w,
+                                    weight=weight, src=src, dest=dest)
+
+        # PostgreSQL & GPDB initialization
+        else:
+
+            plpy.execute(""" CREATE TABLE {out_table} AS
+                (SELECT {grp_comma} {src}, {dest}, {weight},
+                    {src} AS parent FROM {edge_table} LIMIT 0)
+                {distribution} """.format(**locals()))
+
+            plpy.execute(""" INSERT INTO {out_table}
+                SELECT {grp_v1_comma}
+                    v1.{vertex_id} AS {src}, v2.{vertex_id} AS {dest},
+                    {init_w} AS {weight}, NULL::INT AS parent
+                FROM
+                    {tmp_view} AS v1 INNER JOIN
+                    {tmp_view} AS v2 ON ({checkg_vv_sub})
+                    """.format(**locals()))
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+            # GPDB and HAWQ have distributed by clauses to help them with 
indexing.
+            # For Postgres we add the indices manually.
+            if is_platform_pg():
+                sql_index = "CREATE INDEX ON {0}({1})".format(out_table, src)
+            else:
+                sql_index = ''
+
+            plpy.execute(sql_index)
+
+            # The source can be reached with 0 cost and next is itself.
+            plpy.execute(
+                """ UPDATE {out_table} SET
+                {weight} = 0, parent = {vertex_id}
+                FROM {vertex_table}
+                WHERE {out_table}.{src} = {vertex_id}
+                AND {out_table}.{dest} = {vertex_id}
+                """.format(**locals()))
+
+            # Distance = 1: every edge means there is a path from src to dest
+
+            # There may be multiple edges defined as a->b,
+            # we only need the minimum weighted one.
+
+            plpy.execute(
+                """ CREATE VIEW {tmp_view} AS
+                    SELECT {grp_comma} {src}, {dest},
+                        min({weight}) AS {weight}
+                    FROM {edge_table}
+                    GROUP BY {grp_comma} {src}, {dest}
+                """.format(**locals()))
+            plpy.execute(
+                """ UPDATE {out_table} SET
+                {weight} = {tmp_view}.{weight}, parent = {tmp_view}.{dest}
+                FROM {tmp_view}
+                WHERE {out_table}.{src} = {tmp_view}.{src}
+                AND {out_table}.{dest} = {tmp_view}.{dest}
+                AND {out_table}.{weight} > {tmp_view}.{weight} {checkg_ot}
+                """.format(**locals()))
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+            ot_sql1 = ot_sql.format(**locals())
+            out_table_1 = out_table
+
+        # Find the maximum number of iterations to try
+        # If not done by v_cnt iterations, there is a negative cycle.
+        v_cnt = plpy.execute(
+            """ SELECT max(count) as max FROM (
+                    SELECT count(DISTINCT {src}) AS count
+                    FROM {out_table_1}
+                    {grp_by}) x
+            """.format(**locals()))[0]['max']
+
+        if v_cnt < 2:
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+            plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                         format(out_table, out_table + "_summary"))
+            if is_hawq:
+                plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+                    out_table_1, out_table_2))
+            if grouping_cols:
+                plpy.error(("Graph APSP: {0} has less than 2 vertices in " +
+                            "every group.").format(edge_table))
+            else:
+                plpy.error("Graph APSP: {0} has less than 2 vertices.".format(
+                    vertex_table))
+
+        for i in range(0, v_cnt + 1):
+
+            """
+            Create a view that will be used to update the output table
+
+            The implementation is based on the matrix multiplication idea.
+            The initial output table consists of 3 sets of vertex pairs.
+            1) for every vervex v: v -> v, weight 0, parent v
+            2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
+            3) for every other vertex pair v1,v2: v1 -> v2, weight 'Inf',
+                parent NULL
+
+            The algorithm "relaxes" the paths: finds alternate paths with less
+            weights
+            At every step, we look at every combination of non-infinite
+            existing paths and edges to see if we can relax a path.
+
+            Assume the graph is a chain: 1->2->3->...
+            The initial output table will have a finite weighted path for 1->2
+            and infinite for the rest. At ith iteration, the output table will
+            have 1->i path and will relax 1->i+1 path from infinite to a
+            finite value (weight of 1->i path + weight of i->i+1 edge) and
+            assign i as the parent.
+
+            Since using '=' with floats is dangerous we use an epsilon value
+            for comparison.
+            """
+
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+            update_sql = """ CREATE VIEW {tmp_view} AS
+                SELECT DISTINCT ON ({grp_o_comma} y.{src}, y.{dest})
+                    {grp_o_comma} y.{src}, y.{dest},y.{weight}, y.parent
+                FROM  {out_table_1} AS out
+                INNER JOIN
+                    (SELECT x.{src}, x.{dest}, x.{weight},
+                        out.{dest} as parent {comma_grp_e}
+                    FROM
+                        ({ot_sql1}) AS out
+                    INNER JOIN
+                        {edge_table} AS edge
+                    ON (out.{dest} = edge.{src} {checkg_eout})
+                    INNER JOIN
+                        (SELECT out.{src}, edge.{dest},
+                            min(out.{weight}+edge.{weight}) AS {weight}
+                            {comma_grp_e}
+                        FROM
+                            ({ot_sql1}) AS out,
+                            {edge_table} AS edge
+                        WHERE out.{dest} = edge.{src} {checkg_eout}
+                        GROUP BY out.{src},edge.{dest} {comma_grp_e}) x
+                    ON (x.{src} = out.{src} AND x.{dest} = edge.{dest} 
{checkg_ex})
+                    WHERE ABS(out.{weight}+edge.{weight} - x.{weight})
+                        < {EPSILON}) y
+                ON (y.{src} = out.{src} AND y.{dest} = out.{dest} {checkg_oy})
+                WHERE y.{weight} < out.{weight}
+                """.format(**locals())
+            plpy.execute(update_sql)
+
+            # HAWQ employs alternating tables and has to be handled separately
+            if is_hawq:
+
+                # Stop if therea re no more updates
+                if table_is_empty(tmp_view):
+
+                    plpy.execute("DROP VIEW {0}".format(tmp_view))
+                    plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(
+                        out_table_1, out_table))
+                    break
+
+                # The new output table will still have the old values for
+                # every vertex pair that does not appear on the update view.
+
+                plpy.execute("TRUNCATE TABLE {0}".format(out_table_2))
+                fill_sql = """ INSERT INTO {out_table_2}
+                    SELECT * FROM {out_table_1} AS out WHERE NOT EXISTS
+                        (SELECT 1 FROM {tmp_view} AS t
+                        WHERE t.{src} = out.{src} AND
+                            t.{dest} = out.{dest} {checkg_o1t})
+                    UNION
+                    SELECT * FROM {tmp_view}""".format(**locals())
+                plpy.execute(fill_sql)
+
+                # Swap the table names and the sql command we use for filtering
+                tmpname = out_table_1
+                out_table_1 = out_table_2
+                out_table_2 = tmpname
+
+                tmpname = ot_sql1
+                ot_sql1 = ot_sql2
+                ot_sql2 = tmpname
+
+            else:
+
+                updates = plpy.execute(
+                    """ UPDATE {out_table}
+                        SET {weight} = {tmp_view}.{weight},
+                            parent = {tmp_view}.parent
+                        FROM {tmp_view}
+                        WHERE {tmp_view}.{src} = {out_table}.{src} AND
+                            {tmp_view}.{dest} = {out_table}.{dest} {checkg_ot}
+                    """.format(**locals()))
+                if updates.nrows() == 0:
+                    break
+
+        # The algorithm should have reached a break command by this point.
+        # This check handles the existence of a negative cycle.
+        if i == v_cnt:
+
+            # If there are no groups, clean up and give error.
+            if grouping_cols is None:
+                plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+                plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                             format(out_table, out_table + "_summary"))
+                if is_hawq:
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+                        out_table_1, out_table_2))
+                plpy.error("Graph APSP: Detected a negative cycle in the 
graph.")
+
+            # It is possible that not all groups has negative cycles.
+            else:
+                # negs is the string created by collating grouping columns.
+                # By looking at the update view, we can see which groups
+                # are in a negative cycle.
+
+                negs = plpy.execute(
+                    """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
+                        FROM {tmp_view}
+                    """.format(**locals()))[0]['grp']
+
+                # Delete the groups with negative cycles from the output table.
+
+                # HAWQ doesn't support DELETE so we have to copy the valid 
results.
+                if is_hawq:
+                    sql_del = """ CREATE TABLE {out_table} AS
+                        SELECT *
+                        FROM {out_table_1} AS out
+                        WHERE NOT EXISTS(
+                            SELECT 1
+                            FROM {tmp_view}
+                            WHERE {checkg_o1t_sub}
+                            );"""
+                    plpy.execute(sql_del.format(**locals()))
+                    plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".format(
+                        out_table_1, out_table_2))
+                else:
+                    sql_del = """ DELETE FROM {out_table}
+                        USING {tmp_view}
+                        WHERE {checkg_ot_sub}"""
+                    plpy.execute(sql_del.format(**locals()))
+
+                plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+
+                # If every group has a negative cycle,
+                # drop the output table as well.
+                if table_is_empty(out_table):
+                    plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                                 format(out_table, out_table + "_summary"))
+                plpy.warning(
+                    """Graph APSP: Detected a negative cycle in the """ +
+                    """sub-graphs of following groups: {0}.""".
+                    format(str(negs)[1:-1]))
+
+        else:
+            plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
+            if is_hawq:
+                plpy.execute("DROP TABLE IF EXISTS {0}".format(out_table_2))
+
+    return None
 
-       with MinWarning("warning"):
-
-               INT_MAX = 2147483647
-               INFINITY = "'Infinity'"
-               EPSILON = 0.000001
-
-               params_types = {'src': str, 'dest': str, 'weight': str}
-               default_args = {'src': 'src', 'dest': 'dest', 'weight': 
'weight'}
-               edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-               # Prepare the input for recording in the summary table
-               if vertex_id is None:
-                       v_st= "NULL"
-                       vertex_id = "id"
-               else:
-                       v_st = vertex_id
-               if edge_args is None:
-                       e_st = "NULL"
-               else:
-                       e_st = edge_args
-               if grouping_cols is None:
-                       g_st = "NULL"
-                       glist = None
-               else:
-                       g_st = grouping_cols
-                       glist = split_quoted_delimited_str(grouping_cols)
-
-               src = edge_params["src"]
-               dest = edge_params["dest"]
-               weight = edge_params["weight"]
-
-               distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
-                       <!"DISTRIBUTED BY ({0})".format(src)!>)
-
-               is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
-
-               _validate_apsp(vertex_table, vertex_id, edge_table,
-                       edge_params, out_table, glist)
-
-               out_table_1 = unique_string(desp='out_table_1')
-               out_table_2 = unique_string(desp='out_table_2')
-               tmp_view = unique_string(desp='tmp_view')
-               v1 = unique_string(desp='v1')
-               v2 = unique_string(desp='v2')
-               message = unique_string(desp='message')
-
-               # Initialize grouping related variables
-               comma_grp = ""
-               comma_grp_e = ""
-               comma_grp_m = ""
-               grp_comma = ""
-               grp_v1_comma = ""
-               grp_o1_comma = ""
-               grp_o_comma = ""
-               checkg_eo = ""
-               checkg_eout = ""
-               checkg_ex = ""
-               checkg_om = ""
-               checkg_o1t_sub = ""
-               checkg_ot_sub = ""
-               checkg_ot = ""
-               checkg_o1t = ""
-               checkg_vv = ""
-               checkg_o2v = ""
-               checkg_oy = ""
-               checkg_vv_sub = "TRUE"
-               grp_by = ""
-
-               if grouping_cols is not None:
-
-                       # We use actual table names in some cases and aliases 
in others
-                       # In some cases, we swap the table names so use of an 
alias is
-                       # necessary. In other cases, they are used to simplify 
debugging.
-
-                       comma_grp = " , " + grouping_cols
-                       comma_grp_e = " , " + _grp_from_table("edge",glist)
-                       comma_grp_m = " , " + _grp_from_table(message,glist)
-                       grp_comma = grouping_cols + " , "
-                       grp_v1_comma = _grp_from_table("v1",glist) + " , "
-                       grp_o1_comma = _grp_from_table(out_table_1,glist) + " , 
"
-                       grp_o_comma = _grp_from_table("out",glist) + " , "
-
-                       checkg_eo = " AND " + 
_check_groups(edge_table,out_table,glist)
-                       checkg_eout = " AND " + 
_check_groups("edge","out",glist)
-                       checkg_ex = " AND " + _check_groups("edge","x",glist)
-                       checkg_om = " AND " + 
_check_groups(out_table,message,glist)
-                       checkg_o1t_sub = _check_groups("out",tmp_view,glist)
-                       checkg_ot_sub = _check_groups(out_table,tmp_view,glist)
-                       checkg_ot = " AND " + 
_check_groups(out_table,tmp_view,glist)
-                       checkg_o1t = " AND " + _check_groups("out","t",glist)
-                       checkg_vv = " AND " + _check_groups("v1","v2",glist)
-                       checkg_o2v = " AND " + 
_check_groups(out_table_2,"v2",glist)
-                       checkg_oy = " AND " + _check_groups("out","y",glist)
-                       checkg_vv_sub = _check_groups("v1","v2",glist)
-                       grp_by = " GROUP BY " + grouping_cols
-
-               w_type = get_expr_type(weight,edge_table).lower()
-               init_w = INT_MAX
-               if w_type in ['real','double precision','float8']:
-                       init_w = INFINITY
-
-               # We keep a summary table to keep track of the parameters used 
for this
-               # APSP run. This table is used in the path finding function to 
eliminate
-               # the need for repetition.
-               plpy.execute( """ CREATE TABLE {out_table}_summary  (
-                       vertex_table            TEXT,
-                       vertex_id               TEXT,
-                       edge_table              TEXT,
-                       edge_args               TEXT,
-                       out_table               TEXT,
-                       grouping_cols           TEXT)
-                       """.format(**locals()))
-               plpy.execute( """ INSERT INTO {out_table}_summary VALUES
-                       ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
-                       '{out_table}', '{g_st}') """.format(**locals()))
-
-               plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-               # Find all of the vertices involved with a given group
-               plpy.execute(""" CREATE VIEW {tmp_view} AS
-                       SELECT {src} AS {vertex_id} {comma_grp}
-                       FROM {edge_table} WHERE {src} IS NOT NULL
-                       UNION
-                       SELECT {dest} AS {vertex_id} {comma_grp}
-                       FROM {edge_table} WHERE {dest} IS NOT NULL
-                       """.format(**locals()))
-
-               # Don't use the unnecessary rows of the output table during 
joins.
-               ot_sql = """ SELECT * FROM {out_table}
-                       WHERE {weight} != {init_w} AND {src} != {dest} """
-
-               # HAWQ does not support UPDATE so the initialization has to 
differ.
-               if is_hawq:
-
-                       plpy.execute(" DROP TABLE IF EXISTS {0},{1}".format(
-                               out_table_1,out_table_2))
-                       # Create 2 identical tables to swap at every iteration.
-                       plpy.execute(""" CREATE TABLE {out_table_1} AS
-                               SELECT {grp_comma} {src},{dest},{weight}, 
NULL::INT AS parent
-                               FROM {edge_table} LIMIT 0 {distribution}
-                               """.format(**locals()))
-                       plpy.execute(""" CREATE TABLE {out_table_2} AS
-                               SELECT * FROM {out_table_1} LIMIT 0 
{distribution}
-                               """.format(**locals()))
-
-                       # The source can be reached with 0 cost and next is 
itself.
-                       plpy.execute(""" INSERT INTO {out_table_2}
-                               SELECT {grp_comma} {vertex_id} AS {src}, 
{vertex_id} AS {dest},
-                                       0 AS {weight}, {vertex_id} AS parent
-                               FROM {tmp_view} """.format(**locals()))
-                       # Distance = 1: every edge means there is a path from 
src to dest
-                       plpy.execute(""" INSERT INTO {out_table_2}
-                               SELECT {grp_comma} {src}, {dest}, {weight}, 
{dest} AS parent
-                               FROM {edge_table} """.format(**locals()))
-
-                       # Fill the rest of the possible pairs with infinite 
initial weights
-                       fill_sql = """ INSERT INTO {out_table_1}
-                               SELECT {grp_v1_comma}
-                                       v1.{vertex_id} AS {src}, v2.{vertex_id} 
AS {dest},
-                                       {init_w} AS {weight}, NULL::INT AS 
parent
-                               FROM {tmp_view} v1, {tmp_view} v2
-                               WHERE NOT EXISTS
-                                       (SELECT 1 FROM {out_table_2}
-                                               WHERE v1.{vertex_id} = {src} AND
-                                                       v2.{vertex_id} = {dest}
-                                                       {checkg_vv} 
{checkg_o2v})
-                                       {checkg_vv}
-                               UNION
-                               SELECT * FROM {out_table_2}
-                               """.format(**locals())
-                       plpy.execute(fill_sql)
-
-                       ot_sql1 = ot_sql.format(out_table = out_table_1, init_w 
= init_w,
-                               weight = weight, src = src, dest = dest)
-                       ot_sql2 = ot_sql.format(out_table = out_table_2, init_w 
= init_w,
-                               weight = weight, src = src, dest = dest)
-
-               # PostgreSQL & GPDB initialization
-               else:
-
-                       plpy.execute( """ CREATE TABLE {out_table} AS
-                               (SELECT {grp_comma} {src}, {dest}, {weight},
-                                       {src} AS parent FROM {edge_table} LIMIT 
0)
-                               {distribution} """.format(**locals()))
-
-                       plpy.execute( """ INSERT INTO {out_table}
-                               SELECT {grp_v1_comma}
-                                       v1.{vertex_id} AS {src}, v2.{vertex_id} 
AS {dest},
-                                       {init_w} AS {weight}, NULL::INT AS 
parent
-                               FROM
-                                       {tmp_view} AS v1 INNER JOIN
-                                       {tmp_view} AS v2 ON ({checkg_vv_sub})
-                                       """.format(**locals()))
-                       plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-                       # GPDB and HAWQ have distributed by clauses to help 
them with indexing.
-                       # For Postgres we add the indices manually.
-                       sql_index = m4_ifdef(<!__POSTGRESQL__!>,
-                               <!""" CREATE INDEX ON {out_table} ({src});
-                               """.format(**locals())!>, <!''!>)
-                       plpy.execute(sql_index)
-
-                       # The source can be reached with 0 cost and next is 
itself.
-                       plpy.execute(
-                               """ UPDATE {out_table} SET
-                               {weight} = 0, parent = {vertex_id}
-                               FROM {vertex_table}
-                               WHERE {out_table}.{src} = {vertex_id}
-                               AND {out_table}.{dest} = {vertex_id}
-                               """.format(**locals()))
-
-                       # Distance = 1: every edge means there is a path from 
src to dest
-
-                       # There may be multiple edges defined as a->b,
-                       # we only need the minimum weighted one.
-
-                       plpy.execute(
-                               """ CREATE VIEW {tmp_view} AS
-                                       SELECT {grp_comma} {src}, {dest},
-                                               min({weight}) AS {weight}
-                                       FROM {edge_table}
-                                       GROUP BY {grp_comma} {src}, {dest}
-                               """.format(**locals()))
-                       plpy.execute(
-                               """ UPDATE {out_table} SET
-                               {weight} = {tmp_view}.{weight}, parent = 
{tmp_view}.{dest}
-                               FROM {tmp_view}
-                               WHERE {out_table}.{src} = {tmp_view}.{src}
-                               AND {out_table}.{dest} = {tmp_view}.{dest}
-                               AND {out_table}.{weight} > {tmp_view}.{weight} 
{checkg_ot}
-                               """.format(**locals()))
-                       plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-
-                       ot_sql1 = ot_sql.format(**locals())
-                       out_table_1 = out_table
-
-               # Find the maximum number of iterations to try
-               # If not done by v_cnt iterations, there is a negative cycle.
-               v_cnt = plpy.execute(
-                       """ SELECT max(count) as max FROM (
-                                       SELECT count(DISTINCT {src}) AS count
-                                       FROM {out_table_1}
-                                       {grp_by}) x
-                       """.format(**locals()))[0]['max']
-
-               if v_cnt < 2:
-                       plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-                       plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-                                       format(out_table, out_table+"_summary"))
-                       if is_hawq:
-                               plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".format(
-                                       out_table_1,out_table_2))
-                       if grouping_cols:
-                               plpy.error(("Graph APSP: {0} has less than 2 
vertices in "+
-                                       "every group.").format(edge_table))
-                       else:
-                               plpy.error("Graph APSP: {0} has less than 2 
vertices.".format(
-                                       vertex_table))
-
-               for i in range(0,v_cnt+1):
-
-                       """
-                       Create a view that will be used to update the output 
table
-
-                       The implementation is based on the matrix 
multiplication idea.
-                       The initial output table consists of 3 sets of vertex 
pairs.
-                       1) for every vervex v: v -> v, weight 0, parent v
-                       2) for every edge v1,v2,w: v1 -> v2, weight w, parent v2
-                       3) for every other vertex pair v1,v2: v1 -> v2, weight 
'Inf',
-                               parent NULL
-
-                       The algorithm "relaxes" the paths: finds alternate 
paths with less
-                       weights
-                       At every step, we look at every combination of 
non-infinite
-                       existing paths and edges to see if we can relax a path.
-
-                       Assume the graph is a chain: 1->2->3->...
-                       The initial output table will have a finite weighted 
path for 1->2
-                       and infinite for the rest. At ith iteration, the output 
table will
-                       have 1->i path and will relax 1->i+1 path from infinite 
to a
-                       finite value (weight of 1->i path + weight of i->i+1 
edge) and
-                       assign i as the parent.
-
-                       Since using '=' with floats is dangerous we use an 
epsilon value
-                       for comparison.
-                       """
-
-                       plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-                       update_sql = """ CREATE VIEW {tmp_view} AS
-                               SELECT DISTINCT ON ({grp_o_comma} y.{src}, 
y.{dest})
-                                       {grp_o_comma} y.{src}, 
y.{dest},y.{weight}, y.parent
-                               FROM  {out_table_1} AS out
-                               INNER JOIN
-                                       (SELECT x.{src}, x.{dest}, x.{weight},
-                                               out.{dest} as parent 
{comma_grp_e}
-                                       FROM
-                                               ({ot_sql1}) AS out
-                                       INNER JOIN
-                                               {edge_table} AS edge
-                                       ON (out.{dest} = edge.{src} 
{checkg_eout})
-                                       INNER JOIN
-                                               (SELECT out.{src}, edge.{dest},
-                                                       
min(out.{weight}+edge.{weight}) AS {weight}
-                                                       {comma_grp_e}
-                                               FROM
-                                                       ({ot_sql1}) AS out,
-                                                       {edge_table} AS edge
-                                               WHERE out.{dest} = edge.{src} 
{checkg_eout}
-                                               GROUP BY out.{src},edge.{dest} 
{comma_grp_e}) x
-                                       ON (x.{src} = out.{src} AND x.{dest} = 
edge.{dest} {checkg_ex})
-                                       WHERE ABS(out.{weight}+edge.{weight} - 
x.{weight})
-                                               < {EPSILON}) y
-                               ON (y.{src} = out.{src} AND y.{dest} = 
out.{dest} {checkg_oy})
-                               WHERE y.{weight} < out.{weight}
-                               """.format(**locals())
-                       plpy.execute(update_sql)
-
-                       # HAWQ employs alternating tables and has to be handled 
separately
-                       if is_hawq:
-
-                               # Stop if therea re no more updates
-                               if table_is_empty(tmp_view):
-
-                                       plpy.execute("DROP VIEW 
{0}".format(tmp_view))
-                                       plpy.execute("ALTER TABLE {0} RENAME TO 
{1}".format(
-                                               out_table_1,out_table))
-                                       break
-
-                               # The new output table will still have the old 
values for
-                               # every vertex pair that does not appear on the 
update view.
-
-                               plpy.execute("TRUNCATE TABLE 
{0}".format(out_table_2))
-                               fill_sql = """ INSERT INTO {out_table_2}
-                                       SELECT * FROM {out_table_1} AS out 
WHERE NOT EXISTS
-                                               (SELECT 1 FROM {tmp_view} AS t
-                                               WHERE t.{src} = out.{src} AND
-                                                       t.{dest} = out.{dest} 
{checkg_o1t})
-                                       UNION
-                                       SELECT * FROM 
{tmp_view}""".format(**locals())
-                               plpy.execute(fill_sql)
-
-                               # Swap the table names and the sql command we 
use for filtering
-                               tmpname = out_table_1
-                               out_table_1 = out_table_2
-                               out_table_2 = tmpname
-
-                               tmpname = ot_sql1
-                               ot_sql1 = ot_sql2
-                               ot_sql2 = tmpname
-
-                       else:
-
-                               updates = plpy.execute(
-                                       """     UPDATE {out_table}
-                                               SET {weight} = 
{tmp_view}.{weight},
-                                                       parent = 
{tmp_view}.parent
-                                               FROM {tmp_view}
-                                               WHERE {tmp_view}.{src} = 
{out_table}.{src} AND
-                                                       {tmp_view}.{dest} = 
{out_table}.{dest} {checkg_ot}
-                                       """.format(**locals()))
-                               if updates.nrows() == 0:
-                                       break
-
-               # The algorithm should have reached a break command by this 
point.
-               # This check handles the existence of a negative cycle.
-               if i == v_cnt:
-
-                       # If there are no groups, clean up and give error.
-                       if grouping_cols is None:
-                               plpy.execute("DROP VIEW IF EXISTS 
{0}".format(tmp_view))
-                               plpy.execute("DROP TABLE IF EXISTS {0},{1}".
-                                       format(out_table, out_table+"_summary"))
-                               if is_hawq:
-                                       plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".format(
-                                               out_table_1,out_table_2))
-                               plpy.error("Graph APSP: Detected a negative 
cycle in the graph.")
-
-                       # It is possible that not all groups has negative 
cycles.
-                       else:
-                               # negs is the string created by collating 
grouping columns.
-                               # By looking at the update view, we can see 
which groups
-                               # are in a negative cycle.
-
-                               negs = plpy.execute(
-                                       """ SELECT array_agg(DISTINCT 
({grouping_cols})) AS grp
-                                               FROM {tmp_view}
-                                       """.format(**locals()))[0]['grp']
-
-                               # Delete the groups with negative cycles from 
the output table.
-
-                               # HAWQ doesn't support DELETE so we have to 
copy the valid results.
-                               if is_hawq:
-                                       sql_del = """ CREATE TABLE {out_table} 
AS
-                                               SELECT *
-                                               FROM {out_table_1} AS out
-                                               WHERE NOT EXISTS(
-                                                       SELECT 1
-                                                       FROM {tmp_view}
-                                                       WHERE {checkg_o1t_sub}
-                                                       );"""
-                                       plpy.execute(sql_del.format(**locals()))
-                                       plpy.execute("DROP VIEW IF EXISTS 
{0}".format(tmp_view))
-                                       plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".format(
-                                               out_table_1,out_table_2))
-                               else:
-                                       sql_del = """ DELETE FROM {out_table}
-                                               USING {tmp_view}
-                                               WHERE {checkg_ot_sub}"""
-                                       plpy.execute(sql_del.format(**locals()))
-
-                               plpy.execute("DROP VIEW IF EXISTS 
{0}".format(tmp_view))
-
-                               # If every group has a negative cycle,
-                               # drop the output table as well.
-                               if table_is_empty(out_table):
-                                       plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".
-                                               
format(out_table,out_table+"_summary"))
-                               plpy.warning(
-                                       """Graph APSP: Detected a negative 
cycle in the """ +
-                                       """sub-graphs of following groups: 
{0}.""".
-                                       format(str(negs)[1:-1]))
-
-               else:
-                       plpy.execute("DROP VIEW IF EXISTS {0}".format(tmp_view))
-                       if is_hawq:
-                               plpy.execute("DROP TABLE IF EXISTS 
{0}".format(out_table_2))
-
-       return None
 
 def graph_apsp_get_path(schema_madlib, apsp_table,
-       source_vertex, dest_vertex, path_table, **kwargs):
-       """
-       Helper function that can be used to get the shortest path between any 2
-       vertices
+                        source_vertex, dest_vertex, path_table, **kwargs):
+    """
+    Helper function that can be used to get the shortest path between any 2
+    vertices
     Args:
-       @param apsp_table               Name of the table that contains the APSP
-                                                       output.
-        @param source_vertex   The vertex that will be the source of the
-                                               desired path.
-        @param dest_vertex             The vertex that will be the destination 
of the
-                                               desired path.
-       """
-
-       with MinWarning("warning"):
-               _validate_get_path(apsp_table, source_vertex, dest_vertex, 
path_table)
-
-               temp1_name = unique_string(desp='temp1')
-               temp2_name = unique_string(desp='temp2')
-
-               summary = plpy.execute("SELECT * FROM 
{0}_summary".format(apsp_table))
-               vertex_id = summary[0]['vertex_id']
-               edge_args = summary[0]['edge_args']
-               grouping_cols = summary[0]['grouping_cols']
-
-               params_types = {'src': str, 'dest': str, 'weight': str}
-               default_args = {'src': 'src', 'dest': 'dest', 'weight': 
'weight'}
-               edge_params = extract_keyvalue_params(edge_args,
-                                            params_types,
-                                            default_args)
-
-               src = edge_params["src"]
-               dest = edge_params["dest"]
-               weight = edge_params["weight"]
-
-               if vertex_id == "NULL":
-                       vertex_id = "id"
-
-               if grouping_cols == "NULL":
-                       grouping_cols = None
-
-               select_grps = ""
-               check_grps_t1 = ""
-               check_grps_t2 = ""
-               grp_comma = ""
-               tmp = ""
-
-               if grouping_cols is not None:
-                       glist = split_quoted_delimited_str(grouping_cols)
-                       select_grps = _grp_from_table(apsp_table,glist) + " , "
-                       check_grps_t1 = " AND " + _check_groups(
-                               apsp_table,temp1_name,glist)
-                       check_grps_t2 = " AND " + _check_groups(
-                               apsp_table,temp2_name,glist)
-
-                       grp_comma = grouping_cols + " , "
-
-               # If the source and destination is the same vertex.
-               # There is no need to check the paths for any group.
-               if source_vertex == dest_vertex:
-                       plpy.execute("""
-                               CREATE TABLE {path_table} AS
-                               SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] 
AS path
-                               FROM {apsp_table} WHERE {src} = {source_vertex} 
AND
-                                       {dest} = {dest_vertex}
-                               """.format(**locals()))
-                       return
-
-               plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
-                       format(temp1_name,temp2_name));
-
-               # Initialize the temporary tables
-               out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
-                               SELECT {grp_comma} {apsp_table}.parent AS 
{vertex_id},
-                                       ARRAY[{dest_vertex}] AS path
-                               FROM {apsp_table}
-                               WHERE {src} = {source_vertex} AND {dest} = 
{dest_vertex}
-                                       AND {apsp_table}.parent IS NOT NULL
-                       """.format(**locals()))
-
-               plpy.execute("""
-                       CREATE TEMP TABLE {temp2_name} AS
-                               SELECT * FROM {temp1_name} LIMIT 0
-                       """.format(**locals()))
-
-               # Follow the 'parent' chain until you reach the case where the 
parent
-               # is the same as destination. This means it is the last vertex 
before
-               # the source.
-               while out.nrows() > 0:
-
-                       plpy.execute("TRUNCATE TABLE 
{temp2_name}".format(**locals()))
-                       # If the parent id is not the same as dest,
-                       # that means we have to follow the chain:
-                       #       Add it to the path and move to its parent
-                       out = plpy.execute(
-                               """ INSERT INTO {temp2_name}
-                               SELECT {select_grps} {apsp_table}.parent AS 
{vertex_id},
-                                       {apsp_table}.{dest} || 
{temp1_name}.path AS path
-                               FROM {apsp_table} INNER JOIN {temp1_name} ON
-                                       ({apsp_table}.{dest} = 
{temp1_name}.{vertex_id}
-                                               {check_grps_t1})
-                               WHERE {src} = {source_vertex} AND
-                                       {apsp_table}.parent <> 
{apsp_table}.{dest}
-                               """.format(**locals()))
-
-                       tmp = temp2_name
-                       temp2_name = temp1_name
-                       temp1_name = tmp
-
-                       tmp = check_grps_t1
-                       check_grps_t1 = check_grps_t2
-                       check_grps_t2 = tmp
-
-               # We have to consider 3 cases.
-               # 1) The path has more than 2 vertices:
-               #       Add the current parent and the source vertex
-               # 2) The path has exactly 2 vertices (an edge between src and 
dest is
-               # the shortest path).
-               #       Add the source vertex
-               # 3) The path has 0 vertices (unreachable)
-               #       Add an empty array.
-
-               # Path with 1 vertex (src == dest) has been handled before
-               plpy.execute("""
-                       CREATE TABLE {path_table} AS
-                       SELECT {grp_comma} {source_vertex} || ({vertex_id} || 
path) AS path
-                       FROM {temp2_name}
-                       WHERE {vertex_id} <> {dest_vertex}
-                       UNION
-                       SELECT {grp_comma} {source_vertex} || path AS path
-                       FROM {temp2_name}
-                       WHERE {vertex_id} = {dest_vertex}
-                       UNION
-                       SELECT {grp_comma} '{{}}'::INT[] AS path
-                       FROM {apsp_table}
-                       WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
-                               AND {apsp_table}.parent IS NULL
-                       """.format(**locals()))
-
-               out = plpy.execute("SELECT 1 FROM {0} LIMIT 
1".format(path_table))
-
-               if out.nrows() == 0:
-                       plpy.error( ("Graph APSP: Vertex {0} and/or {1} is not 
present"+
-                               " in the APSP table {1}").format(
-                               source_vertex,dest_vertex,apsp_table))
-
-               plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
-                       format(**locals()))
-
-       return None
+        @param apsp_table       Name of the table that contains the APSP
+                                output.
+        @param source_vertex    The vertex that will be the source of the
+                                desired path.
+        @param dest_vertex      The vertex that will be the destination of the
+                                desired path.
+    """
+
+    with MinWarning("warning"):
+        _validate_get_path(apsp_table, source_vertex, dest_vertex, path_table)
+
+        temp1_name = unique_string(desp='temp1')
+        temp2_name = unique_string(desp='temp2')
+
+        summary = plpy.execute("SELECT * FROM {0}_summary".format(apsp_table))
+        vertex_id = summary[0]['vertex_id']
+        edge_args = summary[0]['edge_args']
+        grouping_cols = summary[0]['grouping_cols']
+
+        params_types = {'src': str, 'dest': str, 'weight': str}
+        default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'}
+        edge_params = extract_keyvalue_params(edge_args,
+                                              params_types,
+                                              default_args)
+
+        src = edge_params["src"]
+        dest = edge_params["dest"]
+        weight = edge_params["weight"]
+
+        if vertex_id == "NULL":
+            vertex_id = "id"
+
+        if grouping_cols == "NULL":
+            grouping_cols = None
+
+        select_grps = ""
+        check_grps_t1 = ""
+        check_grps_t2 = ""
+        grp_comma = ""
+        tmp = ""
+
+        if grouping_cols is not None:
+            glist = split_quoted_delimited_str(grouping_cols)
+            select_grps = _grp_from_table(apsp_table, glist) + " , "
+            check_grps_t1 = " AND " + _check_groups(
+                apsp_table, temp1_name, glist)
+            check_grps_t2 = " AND " + _check_groups(
+                apsp_table, temp2_name, glist)
+
+            grp_comma = grouping_cols + " , "
+
+        # If the source and destination is the same vertex.
+        # There is no need to check the paths for any group.
+        if source_vertex == dest_vertex:
+            plpy.execute("""
+                CREATE TABLE {path_table} AS
+                SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path
+                FROM {apsp_table} WHERE {src} = {source_vertex} AND
+                    {dest} = {dest_vertex}
+                """.format(**locals()))
+            return
+
+        plpy.execute("DROP TABLE IF EXISTS {0},{1}".
+                     format(temp1_name, temp2_name))
+
+        # Initialize the temporary tables
+        out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+                SELECT {grp_comma} {apsp_table}.parent AS {vertex_id},
+                    ARRAY[{dest_vertex}] AS path
+                FROM {apsp_table}
+                WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+                    AND {apsp_table}.parent IS NOT NULL
+            """.format(**locals()))
+
+        plpy.execute("""
+            CREATE TEMP TABLE {temp2_name} AS
+                SELECT * FROM {temp1_name} LIMIT 0
+            """.format(**locals()))
+
+        # Follow the 'parent' chain until you reach the case where the parent
+        # is the same as destination. This means it is the last vertex before
+        # the source.
+        while out.nrows() > 0:
+
+            plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals()))
+            # If the parent id is not the same as dest,
+            # that means we have to follow the chain:
+            #   Add it to the path and move to its parent
+            out = plpy.execute(
+                """ INSERT INTO {temp2_name}
+                SELECT {select_grps} {apsp_table}.parent AS {vertex_id},
+                    {apsp_table}.{dest} || {temp1_name}.path AS path
+                FROM {apsp_table} INNER JOIN {temp1_name} ON
+                    ({apsp_table}.{dest} = {temp1_name}.{vertex_id}
+                        {check_grps_t1})
+                WHERE {src} = {source_vertex} AND
+                    {apsp_table}.parent <> {apsp_table}.{dest}
+                """.format(**locals()))
+
+            tmp = temp2_name
+            temp2_name = temp1_name
+            temp1_name = tmp
+
+            tmp = check_grps_t1
+            check_grps_t1 = check_grps_t2
+            check_grps_t2 = tmp
+
+        # We have to consider 3 cases.
+        # 1) The path has more than 2 vertices:
+        #   Add the current parent and the source vertex
+        # 2) The path has exactly 2 vertices (an edge between src and dest is
+        # the shortest path).
+        #   Add the source vertex
+        # 3) The path has 0 vertices (unreachable)
+        #   Add an empty array.
+
+        # Path with 1 vertex (src == dest) has been handled before
+        plpy.execute("""
+            CREATE TABLE {path_table} AS
+            SELECT {grp_comma} {source_vertex} || ({vertex_id} || path) AS path
+            FROM {temp2_name}
+            WHERE {vertex_id} <> {dest_vertex}
+            UNION
+            SELECT {grp_comma} {source_vertex} || path AS path
+            FROM {temp2_name}
+            WHERE {vertex_id} = {dest_vertex}
+            UNION
+            SELECT {grp_comma} '{{}}'::INT[] AS path
+            FROM {apsp_table}
+            WHERE {src} = {source_vertex} AND {dest} = {dest_vertex}
+                AND {apsp_table}.parent IS NULL
+            """.format(**locals()))
+
+        out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table))
+
+        if out.nrows() == 0:
+            plpy.error(("Graph APSP: Vertex {0} and/or {1} is not present" +
+                        " in the APSP table {1}").format(
+                source_vertex, dest_vertex, apsp_table))
+
+        plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+                     format(**locals()))
+
+    return None
+
 
 def _validate_apsp(vertex_table, vertex_id, edge_table, edge_params,
-       out_table, glist, **kwargs):
+                   out_table, glist, **kwargs):
+
+    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+                          out_table, 'APSP')
 
-       validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-               out_table,'APSP')
+    vt_error = plpy.execute(
+        """ SELECT {vertex_id}
+            FROM {vertex_table}
+            WHERE {vertex_id} IS NOT NULL
+            GROUP BY {vertex_id}
+            HAVING count(*) > 1 """.format(**locals()))
 
-       vt_error = plpy.execute(
-               """ SELECT {vertex_id}
-                       FROM {vertex_table}
-                       WHERE {vertex_id} IS NOT NULL
-                       GROUP BY {vertex_id}
-                       HAVING count(*) > 1 """.format(**locals()))
+    if vt_error.nrows() != 0:
+        plpy.error(
+            """Graph APSP: Source vertex table {vertex_table} contains 
duplicate vertex id's.""".
+            format(**locals()))
 
-       if vt_error.nrows() != 0:
-               plpy.error(
-                       """Graph APSP: Source vertex table {vertex_table} 
contains duplicate vertex id's.""".
-                       format(**locals()))
+    _assert(not table_exists(out_table + "_summary"),
+            "Graph APSP: Output summary table already exists!")
 
-       _assert(not table_exists(out_table+"_summary"),
-               "Graph APSP: Output summary table already exists!")
+    if glist is not None:
+        _assert(columns_exist_in_table(edge_table, glist),
+                """Graph APSP: Not all columns from {glist} are present in 
edge table ({edge_table}).""".
+                format(**locals()))
 
-       if glist is not None:
-               _assert(columns_exist_in_table(edge_table, glist),
-                       """Graph APSP: Not all columns from {glist} are present 
in edge table ({edge_table}).""".
-                       format(**locals()))
 
 def _validate_get_path(apsp_table, source_vertex, dest_vertex,
-       path_table, **kwargs):
+                       path_table, **kwargs):
 
-       _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
-               "Graph APSP: Invalid APSP table name!")
-       _assert(table_exists(apsp_table),
-               "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
-       _assert(not table_is_empty(apsp_table),
-               "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
+    _assert(apsp_table and apsp_table.strip().lower() not in ('null', ''),
+            "Graph APSP: Invalid APSP table name!")
+    _assert(table_exists(apsp_table),
+            "Graph APSP: APSP table ({0}) is missing!".format(apsp_table))
+    _assert(not table_is_empty(apsp_table),
+            "Graph APSP: APSP table ({0}) is empty!".format(apsp_table))
 
-       summary = apsp_table+"_summary"
-       _assert(table_exists(summary),
-               "Graph APSP: APSP summary table ({0}) is 
missing!".format(summary))
-       _assert(not table_is_empty(summary),
-               "Graph APSP: APSP summary table ({0}) is 
empty!".format(summary))
+    summary = apsp_table + "_summary"
+    _assert(table_exists(summary),
+            "Graph APSP: APSP summary table ({0}) is missing!".format(summary))
+    _assert(not table_is_empty(summary),
+            "Graph APSP: APSP summary table ({0}) is empty!".format(summary))
 
-       _assert(not table_exists(path_table),
-               "Graph APSP: Output path table already exists!")
+    _assert(not table_exists(path_table),
+            "Graph APSP: Output path table already exists!")
+
+    return None
 
-       return None
 
 def graph_apsp_help(schema_madlib, message, **kwargs):
-       """
-       Help function for graph_apsp and graph_apsp_get_path
-
-       Args:
-               @param schema_madlib
-               @param message: string, Help message string
-               @param kwargs
-
-       Returns:
-           String. Help/usage information
-       """
-       if not message:
-               help_string = """
+    """
+    Help function for graph_apsp and graph_apsp_get_path
+
+    Args:
+        @param schema_madlib
+        @param message: string, Help message string
+        @param kwargs
+
+    Returns:
+        String. Help/usage information
+    """
+    if not message:
+        help_string = """
 -----------------------------------------------------------------------
                             SUMMARY
 -----------------------------------------------------------------------
@@ -717,8 +717,8 @@ edges is minimized.
 For more details on function usage:
     SELECT {schema_madlib}.graph_apsp('usage')
             """
-       elif message.lower() in ['usage', 'help', '?']:
-               help_string = """
+    elif message.lower() in ['usage', 'help', '?']:
+        help_string = """
 Given a graph, all pairs shortest path (apsp) algorithm finds a path for
 every vertex pair such that the sum of the weights of its constituent
 edges is minimized.
@@ -728,10 +728,10 @@ edges is minimized.
 To retrieve the path for a specific vertex pair:
 
  SELECT {schema_madlib}.graph_apsp_get_path(
-    apsp_table  TEXT, -- Name of the table that contains the apsp output.
+    apsp_table   TEXT, -- Name of the table that contains the apsp output.
     source_vertex INT, -- The vertex that will be the source of the
                        -- desired path.
-    dest_vertex          INT, -- The vertex that will be the destination of the
+    dest_vertex   INT, -- The vertex that will be the destination of the
                        -- desired path.
     path_table   TEXT  -- Name of the output table that contains the path.
 );
@@ -762,8 +762,8 @@ every group and has the following columns:
   - path (ARRAY)  : The shortest path from the source vertex to the
                   destination vertex.
 """
-       elif message.lower() in ("example", "examples"):
-               help_string = """
+    elif message.lower() in ("example", "examples"):
+        help_string = """
 ----------------------------------------------------------------------------
                                 EXAMPLES
 ----------------------------------------------------------------------------
@@ -806,11 +806,11 @@ INSERT INTO edge VALUES
 -- Compute the apsp:
 DROP TABLE IF EXISTS out;
 SELECT madlib.graph_apsp(
-       'vertex',                            -- Vertex table
-       'id',                                -- Vertix id column
-       'edge',                              -- Edge table
-       'src=src, dest=dest, weight=weight', -- Comma delimited string of edge 
arguments
-       'out'                                -- Output table of apsp
+    'vertex',                            -- Vertex table
+    'id',                                -- Vertix id column
+    'edge',                              -- Edge table
+    'src=src, dest=dest, weight=weight', -- Comma delimited string of edge 
arguments
+    'out'                                -- Output table of apsp
 );
 -- View the apsp costs for every vertex:
 SELECT * FROM out ORDER BY src, dest;
@@ -834,11 +834,11 @@ INSERT INTO edge_gr VALUES
 DROP TABLE IF EXISTS out_gr, out_gr_summary;
 SELECT graph_apsp('vertex',NULL,'edge_gr',NULL,'out_gr','grp');
 """
-       else:
-               help_string = "No such option. Use {schema_madlib}.graph_apsp()"
+    else:
+        help_string = "No such option. Use {schema_madlib}.graph_apsp()"
 
-       return help_string.format(schema_madlib=schema_madlib,
-               graph_usage=get_graph_usage(schema_madlib, 'graph_apsp',
-    """out_table     TEXT, -- Name of the table to store the result of apsp.
+    return help_string.format(schema_madlib=schema_madlib,
+                              graph_usage=get_graph_usage(schema_madlib, 
'graph_apsp',
+                                                          """out_table     
TEXT, -- Name of the table to store the result of apsp.
     grouping_cols TEXT  -- The list of grouping columns."""))
 # ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/graph_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in 
b/src/ports/postgres/modules/graph/graph_utils.py_in
index 944c301..9d31345 100644
--- a/src/ports/postgres/modules/graph/graph_utils.py_in
+++ b/src/ports/postgres/modules/graph/graph_utils.py_in
@@ -27,115 +27,115 @@
 @namespace graph
 """
 
-import plpy
-from utilities.control import MinWarning
 from utilities.utilities import _assert
-from utilities.utilities import extract_keyvalue_params
-from utilities.utilities import unique_string
 from utilities.validate_args import get_cols
 from utilities.validate_args import unquote_ident
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
 
+
 def _grp_null_checks(grp_list):
 
     """
-    Helper function for generating NULL checks for grouping columns 
+    Helper function for generating NULL checks for grouping columns
     to be used within a WHERE clause
     Args:
         @param grp_list   The list of grouping columns
     """
     return ' AND '.join([" {i} IS NOT NULL ".format(**locals())
-        for i in grp_list])
+                        for i in grp_list])
+
 
 def _check_groups(tbl1, tbl2, grp_list):
+    """
+    Helper function for joining tables with groups.
+    Args:
+            @param tbl1       Name of the first table
+            @param tbl2       Name of the second table
+            @param grp_list   The list of grouping columns
+    """
 
-       """
-       Helper function for joining tables with groups.
-       Args:
-               @param tbl1       Name of the first table
-               @param tbl2       Name of the second table
-               @param grp_list   The list of grouping columns
-       """
+    return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
+                         for i in grp_list])
 
-       return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
-               for i in grp_list])
 
 def _grp_from_table(tbl, grp_list):
+    """
+    Helper function for selecting grouping columns of a table
+    Args:
+            @param tbl        Name of the table
+            @param grp_list   The list of grouping columns
+    """
+    return ' , '.join([" {tbl}.{i} ".format(**locals())
+                       for i in grp_list])
 
-       """
-       Helper function for selecting grouping columns of a table
-       Args:
-               @param tbl        Name of the table
-               @param grp_list   The list of grouping columns
-       """
-       return ' , '.join([" {tbl}.{i} ".format(**locals())
-               for i in grp_list])
 
 def validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
-       out_table, func_name, **kwargs):
-       """
-       Validates graph tables (vertex and edge) as well as the output table.
-       """
-       _assert(out_table and out_table.strip().lower() not in ('null', ''),
-               "Graph {func_name}: Invalid output table 
name!".format(**locals()))
-       _assert(not table_exists(out_table),
-               "Graph {func_name}: Output table already 
exists!".format(**locals()))
-
-       _assert(vertex_table and vertex_table.strip().lower() not in ('null', 
''),
-               "Graph {func_name}: Invalid vertex table 
name!".format(**locals()))
-       _assert(table_exists(vertex_table),
-               "Graph {func_name}: Vertex table ({vertex_table}) is 
missing!".format(
-                       **locals()))
-       _assert(not table_is_empty(vertex_table),
-               "Graph {func_name}: Vertex table ({vertex_table}) is 
empty!".format(
-                       **locals()))
-
-       _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
-               "Graph {func_name}: Invalid edge table 
name!".format(**locals()))
-       _assert(table_exists(edge_table),
-               "Graph {func_name}: Edge table ({edge_table}) is 
missing!".format(
-                       **locals()))
-       _assert(not table_is_empty(edge_table),
-               "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
-                       **locals()))
-
-       existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
-       _assert(vertex_id in existing_cols,
-               """Graph {func_name}: The vertex column {vertex_id} is not 
present in vertex table ({vertex_table}) """.
-               format(**locals()))
-       _assert(columns_exist_in_table(edge_table, edge_params.values()),
-               """Graph {func_name}: Not all columns from {cols} are present 
in edge table ({edge_table})""".
-               format(cols=edge_params.values(), **locals()))
-
-       return None
+                          out_table, func_name, **kwargs):
+    """
+    Validates graph tables (vertex and edge) as well as the output table.
+    """
+    _assert(out_table and out_table.strip().lower() not in ('null', ''),
+            "Graph {func_name}: Invalid output table name!".format(**locals()))
+    _assert(not table_exists(out_table),
+            "Graph {func_name}: Output table already 
exists!".format(**locals()))
+
+    _assert(vertex_table and vertex_table.strip().lower() not in ('null', ''),
+            "Graph {func_name}: Invalid vertex table name!".format(**locals()))
+    _assert(table_exists(vertex_table),
+            "Graph {func_name}: Vertex table ({vertex_table}) is 
missing!".format(
+        **locals()))
+    _assert(not table_is_empty(vertex_table),
+            "Graph {func_name}: Vertex table ({vertex_table}) is 
empty!".format(
+        **locals()))
+
+    _assert(edge_table and edge_table.strip().lower() not in ('null', ''),
+            "Graph {func_name}: Invalid edge table name!".format(**locals()))
+    _assert(table_exists(edge_table),
+            "Graph {func_name}: Edge table ({edge_table}) is missing!".format(
+        **locals()))
+    _assert(not table_is_empty(edge_table),
+            "Graph {func_name}: Edge table ({edge_table}) is empty!".format(
+        **locals()))
+
+    existing_cols = set(unquote_ident(i) for i in get_cols(vertex_table))
+    _assert(vertex_id in existing_cols,
+            """Graph {func_name}: The vertex column {vertex_id} is not present 
in vertex table ({vertex_table}) """.
+            format(**locals()))
+    _assert(columns_exist_in_table(edge_table, edge_params.values()),
+            """Graph {func_name}: Not all columns from {cols} are present in 
edge table ({edge_table})""".
+            format(cols=edge_params.values(), **locals()))
+
+    return None
+
 
 def get_graph_usage(schema_madlib, func_name, other_text):
 
-       usage = """
-----------------------------------------------------------------------------
-                            USAGE
-----------------------------------------------------------------------------
- SELECT {schema_madlib}.{func_name}(
-    vertex_table  TEXT, -- Name of the table that contains the vertex data.
-    vertex_id     TEXT, -- Name of the column containing the vertex ids.
-    edge_table    TEXT, -- Name of the table that contains the edge data.
-    edge_args     TEXT{comma} -- A comma-delimited string containing multiple
-                        -- named arguments of the form "name=value".
-    {other_text}
-);
-
-The following parameters are supported for edge table arguments ('edge_args'
-       above):
-
-src (default = 'src')          : Name of the column containing the source
-                               vertex ids in the edge table.
-dest (default = 'dest')                : Name of the column containing the 
destination
-                               vertex ids in the edge table.
-weight (default = 'weight')    : Name of the column containing the weight of
-                               edges in the edge table.
-""".format(schema_madlib=schema_madlib, func_name=func_name,
-       other_text=other_text, comma = ',' if other_text is not None else ' ')
-
-       return usage
+    usage = """
+    
----------------------------------------------------------------------------
+                                USAGE
+    
----------------------------------------------------------------------------
+     SELECT {schema_madlib}.{func_name}(
+        vertex_table  TEXT, -- Name of the table that contains the vertex data.
+        vertex_id     TEXT, -- Name of the column containing the vertex ids.
+        edge_table    TEXT, -- Name of the table that contains the edge data.
+        edge_args     TEXT{comma} -- A comma-delimited string containing 
multiple
+                            -- named arguments of the form "name=value".
+        {other_text}
+    );
+
+    The following parameters are supported for edge table arguments
+    ('edge_args' above):
+
+    src (default = 'src'): Name of the column containing the source
+                           vertex ids in the edge table.
+    dest (default = 'dest'): Name of the column containing the destination
+                            vertex ids in the edge table.
+    weight (default = 'weight'): Name of the column containing the weight of
+                                 edges in the edge table.
+    """.format(schema_madlib=schema_madlib,
+               func_name=func_name,
+               other_text=other_text,
+               comma=',' if other_text is not None else ' ')
+    return usage

Reply via email to