Mulltiple: Add grouping support for SSSP and support GPDB5

JIRA: MADLIB-1081

- This commit adds grouping support for SSSP as well as its path function.
- Update chi2 test for GPDB5 alpha compatibility.
- Decouple DROP and CREATE statements for various modules.

Closes #113


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/8faf6226
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/8faf6226
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/8faf6226

Branch: refs/heads/latest_release
Commit: 8faf62263f6c5aa4281e2d3dc33e389d41784c0e
Parents: c82b9d0
Author: Orhan Kislal <okis...@pivotal.io>
Authored: Mon Apr 17 11:17:41 2017 -0700
Committer: Orhan Kislal <okis...@pivotal.io>
Committed: Mon Apr 17 11:17:41 2017 -0700

----------------------------------------------------------------------
 .../elastic_net_generate_result.py_in           |   2 +-
 .../postgres/modules/graph/graph_utils.py_in    |   2 +-
 src/ports/postgres/modules/graph/sssp.py_in     | 716 ++++++++++++++-----
 src/ports/postgres/modules/graph/sssp.sql_in    | 132 +++-
 .../postgres/modules/graph/test/sssp.sql_in     |  75 +-
 src/ports/postgres/modules/pca/pca.py_in        |   6 +-
 .../modules/stats/test/chi2_test.sql_in         |   2 +-
 .../validation/internal/cross_validation.py_in  |   6 +-
 8 files changed, 739 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
----------------------------------------------------------------------
diff --git 
a/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in 
b/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
index c48beca..6246ed9 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
@@ -81,8 +81,8 @@ def _elastic_net_generate_result(optimizer, iteration_run, 
**args):
                    schema_madlib=args["schema_madlib"])
 
     # Create the output table
+    plpy.execute("DROP TABLE IF EXISTS {tbl_result}".format(**args))
     plpy.execute("""
-             DROP TABLE IF EXISTS {tbl_result};
              CREATE TABLE {tbl_result} (
                  {select_grouping_info}
                  family            text,

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/graph/graph_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/graph_utils.py_in 
b/src/ports/postgres/modules/graph/graph_utils.py_in
index 2d83301..25f70a5 100644
--- a/src/ports/postgres/modules/graph/graph_utils.py_in
+++ b/src/ports/postgres/modules/graph/graph_utils.py_in
@@ -72,7 +72,7 @@ def validate_graph_coding(vertex_table, vertex_id, 
edge_table, edge_params,
                """Graph {func_name}: The vertex column {vertex_id} is not 
present in vertex table ({vertex_table}) """.
                format(**locals()))
        _assert(columns_exist_in_table(edge_table, edge_params.values()),
-               """Graph {func_name}: Not all columns from {cols} present in 
edge table ({edge_table})""".
+               """Graph {func_name}: Not all columns from {cols} are present 
in edge table ({edge_table})""".
                format(cols=edge_params.values(), **locals()))
 
        return None

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/graph/sssp.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.py_in 
b/src/ports/postgres/modules/graph/sssp.py_in
index 4d27761..2520830 100644
--- a/src/ports/postgres/modules/graph/sssp.py_in
+++ b/src/ports/postgres/modules/graph/sssp.py_in
@@ -33,27 +33,56 @@ from utilities.control import MinWarning
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
-from utilities.validate_args import get_cols
-from utilities.validate_args import unquote_ident
+from utilities.utilities import _string_to_array
+from utilities.utilities import split_quoted_delimited_str
 from utilities.validate_args import table_exists
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import table_is_empty
+from utilities.validate_args import get_cols_and_types
+from utilities.validate_args import get_expr_type
 
 m4_changequote(`<!', `!>')
 
+
+def _check_groups(tbl1, tbl2, grp_list):
+
+       """
+       Helper function for joining tables with groups.
+       Args:
+               @param tbl1       Name of the first table
+               @param tbl2       Name of the second table
+               @param grp_list   The list of grouping columns
+       """
+
+       return ' AND '.join([" {tbl1}.{i} = {tbl2}.{i} ".format(**locals())
+               for i in grp_list])
+
+def _grp_from_table(tbl, grp_list):
+
+       """
+       Helper function for selecting grouping columns of a table
+       Args:
+               @param tbl        Name of the table
+               @param grp_list   The list of grouping columns
+       """
+       return ' , '.join([" {tbl}.{i} ".format(**locals())
+               for i in grp_list])
+
 def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
-               edge_args, source_vertex, out_table, **kwargs):
+               edge_args, source_vertex, out_table, grouping_cols, **kwargs):
+
        """
     Single source shortest path function for graphs using the Bellman-Ford
     algorhtm [1].
     Args:
-        @param vertex_table     Name of the table that contains the vertex 
data.
-        @param vertex_id        Name of the column containing the vertex ids.
-        @param edge_table       Name of the table that contains the edge data.
-        @param edge_args        A comma-delimited string containing multiple
-                                                       named arguments of the 
form "name=value".
-        @param source_vertex    The source vertex id for the algorithm to 
start.
-        @param out_table           Name of the table to store the result of 
SSSP.
+        @param vertex_table    Name of the table that contains the vertex data.
+        @param vertex_id       Name of the column containing the vertex ids.
+        @param edge_table      Name of the table that contains the edge data.
+        @param edge_args       A comma-delimited string containing multiple
+                               named arguments of the form "name=value".
+        @param source_vertex   The source vertex id for the algorithm to start.
+        @param out_table       Name of the table to store the result of SSSP.
+        @param grouping_cols   The list of grouping columns.
 
     [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm
     """
@@ -61,6 +90,7 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, 
edge_table,
        with MinWarning("warning"):
 
                INT_MAX = 2147483647
+               INFINITY = "'Infinity'"
                EPSILON = 0.000001
 
                message = unique_string(desp='message')
@@ -73,8 +103,23 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, 
edge_table,
                edge_params = extract_keyvalue_params(edge_args,
                                             params_types,
                                             default_args)
+
+               # Prepare the input for recording in the summary table
                if vertex_id is None:
+                       v_st= "NULL"
                        vertex_id = "id"
+               else:
+                       v_st = vertex_id
+               if edge_args is None:
+                       e_st = "NULL"
+               else:
+                       e_st = edge_args
+               if grouping_cols is None:
+                       g_st = "NULL"
+                       glist = None
+               else:
+                       g_st = grouping_cols
+                       glist = split_quoted_delimited_str(grouping_cols)
 
                src = edge_params["src"]
                dest = edge_params["dest"]
@@ -85,47 +130,91 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, 
edge_table,
                local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
                        <!"DISTRIBUTED BY (id)"!>)
 
-               validate_sssp(vertex_table, vertex_id, edge_table,
-                       edge_params, source_vertex, out_table)
+               is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>)
+               _validate_sssp(vertex_table, vertex_id, edge_table,
+                       edge_params, source_vertex, out_table, glist)
 
                plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format(
                        message,oldupdate,newupdate))
 
+               # Initialize grouping related variables
+               comma_grp = ""
+               comma_grp_e = ""
+               comma_grp_m = ""
+               grp_comma = ""
+               checkg_oo = ""
+               checkg_eo = ""
+               checkg_ex = ""
+               checkg_om = ""
+               group_by = ""
+
+               if grouping_cols is not None:
+                       comma_grp = " , " + grouping_cols
+                       group_by = " , " + _grp_from_table(edge_table,glist)
+                       comma_grp_e = " , " + _grp_from_table(edge_table,glist)
+                       comma_grp_m = " , " + _grp_from_table("message",glist)
+                       grp_comma = grouping_cols + " , "
+
+                       checkg_oo_sub = 
_check_groups(out_table,"oldupdate",glist)
+                       checkg_oo = " AND " + checkg_oo_sub
+                       checkg_eo = " AND " + 
_check_groups(edge_table,"oldupdate",glist)
+                       checkg_ex = " AND " + 
_check_groups(edge_table,"x",glist)
+                       checkg_om = " AND " + 
_check_groups("out_table","message",glist)
+
+               w_type = get_expr_type(weight,edge_table).lower()
+               init_w = INT_MAX
+               if w_type in ['double precision','float8']:
+                       init_w = INFINITY
+
                # We keep a table of every vertex, the minimum cost to that 
destination
                # seen so far and the parent to this vertex in the associated 
shortest
-               # path. This table will be updated throughtout the execution.
+               # path. This table will be updated throughout the execution.
                plpy.execute(
-                       """ CREATE TABLE {out_table} AS
-                               SELECT {vertex_id} AS {vertex_id},
-                                       CAST('Infinity' AS DOUBLE PRECISION) AS 
{weight},
-                                       NULL::INT AS parent
-                               FROM {vertex_table}
-                               WHERE {vertex_id} IS NOT NULL
+                       """ CREATE TABLE {out_table} AS ( SELECT
+                                       {grp_comma} {src} AS {vertex_id}, 
{weight},
+                                       {src} AS parent FROM {edge_table} LIMIT 
0)
                                {distribution} """.format(**locals()))
 
+               # We keep a summary table to keep track of the parameters used 
for this
+               # SSSP run. This table is used in the path finding function to 
eliminate
+               # the need for repetition.
+               plpy.execute( """ CREATE TABLE {out_table}_summary  (
+                       vertex_table            TEXT,
+                       vertex_id               TEXT,
+                       edge_table              TEXT,
+                       edge_args               TEXT,
+                       source_vertex           INTEGER,
+                       out_table               TEXT,
+                       grouping_cols           TEXT)
+                       """.format(**locals()))
+               plpy.execute( """ INSERT INTO {out_table}_summary VALUES
+                       ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}',
+                       {source_vertex}, '{out_table}', '{g_st}')
+                       """.format(**locals()))
+
                # We keep 2 update tables and alternate them during the 
execution.
                # This is necessary since we need to know which vertices are 
updated in
                # the previous iteration to calculate the next set of updates.
                plpy.execute(
-                       """ CREATE TEMP TABLE {oldupdate}(
-                               id INT, val DOUBLE PRECISION, parent INT)
+                       """ CREATE TEMP TABLE {oldupdate} AS ( SELECT
+                                       {src} AS id, {weight},
+                                       {src} AS parent {comma_grp} FROM 
{edge_table} LIMIT 0)
                                {local_distribution}
                                """.format(**locals()))
                plpy.execute(
-                       """ CREATE TEMP TABLE {newupdate}(
-                               id INT, val DOUBLE PRECISION, parent INT)
+                       """ CREATE TEMP TABLE {newupdate} AS ( SELECT
+                                       {src} AS id, {weight},
+                                       {src} AS parent {comma_grp} FROM 
{edge_table} LIMIT 0)
                                {local_distribution}
                                """.format(**locals()))
 
                # Since HAWQ does not allow us to update, we create a new table 
and
-               # rename at every iteration
-               temp_table = unique_string(desp='temp')
-               sql = m4_ifdef(<!__HAWQ__!>,
-                       """ CREATE TABLE {temp_table} (
-                                       {vertex_id} INT, {weight} DOUBLE 
PRECISION, parent INT)
-                                       {distribution};
-                       """,  <!''!>)
-               plpy.execute(sql.format(**locals()))
+               # rename at every iteration.
+               if is_hawq:
+                       temp_table = unique_string(desp='temp')
+                       sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM 
{out_table} )
+                               {distribution} """
+                       plpy.execute(sql.format(**locals()))
 
                # GPDB and HAWQ have distributed by clauses to help them with 
indexing.
                # For Postgres we add the indices manually.
@@ -137,45 +226,117 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, 
edge_table,
                        <!''!>)
                plpy.execute(sql_index)
 
-               # The source can be reached with 0 cost and it has itself as 
the parent.
-               plpy.execute(
-                       """ INSERT INTO {oldupdate}
-                               VALUES({source_vertex},0,{source_vertex})
-                       """.format(**locals()))
+               # The initialization step is quite different when grouping is 
involved
+               # since not every group (subgraph) will have the same set of 
vertices.
+
+               # Example:
+               # Assume there are two grouping columns g1 and g2
+               # g1 values are 0 and 1. g2 values are 5 and 6
+               if grouping_cols is not None:
+
+                       distinct_grp_table = unique_string(desp='grp')
+                       plpy.execute(""" DROP TABLE IF EXISTS 
{distinct_grp_table} """.
+                               format(**locals()))
+                       plpy.execute( """ CREATE TEMP TABLE 
{distinct_grp_table} AS
+                               SELECT DISTINCT {grouping_cols} FROM 
{edge_table} """.
+                               format(**locals()))
+                       subq = unique_string(desp='subquery')
+
+                       checkg_ds_sub = 
_check_groups(distinct_grp_table,subq,glist)
+                       grp_d_comma = _grp_from_table(distinct_grp_table,glist) 
+","
+
+                       plpy.execute(
+                               """ INSERT INTO {out_table}
+                               SELECT {grp_d_comma} {vertex_id} AS {vertex_id},
+                                       {init_w} AS {weight}, NULL::INT AS 
parent
+                               FROM {distinct_grp_table} INNER JOIN
+                                       (
+                                       SELECT {src} AS {vertex_id} {comma_grp}
+                                       FROM {edge_table}
+                                       UNION
+                                       SELECT {dest} AS {vertex_id} {comma_grp}
+                                       FROM {edge_table}
+                                       ) {subq} ON ({checkg_ds_sub})
+                               WHERE {vertex_id} IS NOT NULL
+                               """.format(**locals()))
+
+                       plpy.execute(
+                               """ INSERT INTO {oldupdate}
+                                       SELECT {source_vertex}, 0, 
{source_vertex},
+                                       {grouping_cols}
+                                       FROM {distinct_grp_table}
+                               """.format(**locals()))
+
+                       # The maximum number of vertices for any group.
+                       # Used for determining negative cycles.
+                       v_cnt = plpy.execute(
+                               """ SELECT max(count) as max FROM (
+                                               SELECT count({vertex_id}) AS 
count
+                                               FROM {out_table}
+                                               GROUP BY {grouping_cols}) x
+                               """.format(**locals()))[0]['max']
+                       plpy.execute("DROP TABLE IF EXISTS 
{0}".format(distinct_grp_table))
+               else:
+                       plpy.execute(
+                               """ INSERT INTO {out_table}
+                               SELECT {vertex_id} AS {vertex_id},
+                                       {init_w} AS {weight},
+                                       NULL AS parent
+                               FROM {vertex_table}
+                               WHERE {vertex_id} IS NOT NULL
+                                """.format(**locals()))
+
+                       # The source can be reached with 0 cost and it has 
itself as the
+                       # parent.
+                       plpy.execute(
+                               """ INSERT INTO {oldupdate}
+                                       
VALUES({source_vertex},0,{source_vertex})
+                               """.format(**locals()))
+
+                       v_cnt = plpy.execute(
+                               """ SELECT count(*) FROM {vertex_table}
+                               WHERE {vertex_id} IS NOT NULL
+                               """.format(**locals()))[0]['count']
 
-               v_cnt = plpy.execute(
-                       """SELECT count(*) FROM {vertex_table}
-                       WHERE {vertex_id} IS NOT 
NULL""".format(**locals()))[0]['count']
                for i in range(0,v_cnt+1):
 
-                       # Apply the updates calculated in the last iteration
-                       sql = m4_ifdef(<!__HAWQ__!>,
-                               <!"""
+                       # Apply the updates calculated in the last iteration.
+                       if is_hawq:
+                               sql = """
                                TRUNCATE TABLE {temp_table};
                                INSERT INTO {temp_table}
                                        SELECT *
                                        FROM {out_table}
-                                       WHERE {out_table}.{vertex_id} NOT IN (
-                                               SELECT {oldupdate}.id FROM 
{oldupdate})
+                                       WHERE NOT EXISTS (
+                                               SELECT 1
+                                               FROM {oldupdate} as oldupdate
+                                               WHERE {out_table}.{vertex_id} = 
oldupdate.id
+                                               {checkg_oo})
                                        UNION
-                                       SELECT * FROM {oldupdate};
+                                       SELECT {grp_comma} id, {weight}, parent 
FROM {oldupdate};
                                DROP TABLE {out_table};
                                ALTER TABLE {temp_table} RENAME TO {out_table};
-                               CREATE TABLE {temp_table} (
-                                       {vertex_id} INT, {weight} DOUBLE 
PRECISION, parent INT)
-                                       {distribution};
-                               """!>,
-                               <!"""
+                               CREATE TABLE {temp_table} AS (
+                                       SELECT * FROM {out_table} LIMIT 0)
+                                       {distribution};"""
+                               plpy.execute(sql.format(**locals()))
+                               ret = plpy.execute("SELECT id FROM {0} LIMIT 1".
+                                       format(oldupdate))
+                       else:
+                               sql = """
                                UPDATE {out_table} SET
-                               {weight}=oldupdate.val,
+                               {weight}=oldupdate.{weight},
                                parent=oldupdate.parent
                                FROM
                                {oldupdate} AS oldupdate
                                WHERE
-                               {out_table}.{vertex_id}=oldupdate.id
-                               """!>)
-                       plpy.execute(sql.format(**locals()))
+                               {out_table}.{vertex_id}=oldupdate.id AND
+                               {out_table}.{weight}>oldupdate.{weight} 
{checkg_oo}
+                               """
+                               ret = plpy.execute(sql.format(**locals()))
 
+                       if ret.nrows() == 0:
+                               break
 
                        plpy.execute("TRUNCATE TABLE {0}".format(newupdate))
 
@@ -194,105 +355,237 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, 
edge_table,
                        # for comparison.
 
                        # Once we have a list of edges and values (stores as 
'message'),
-                       # we check if these values are lower than the existing 
shortest path
-                       # values.
+                       # we check if these values are lower than the existing 
shortest
+                       # path values.
 
                        sql = (""" INSERT INTO {newupdate}
-                               SELECT DISTINCT ON (message.id) message.id AS 
id,
-                                       message.val AS val,
-                                       message.parent AS parent
+                               SELECT DISTINCT ON (message.id {comma_grp})
+                                       message.id AS id,
+                                       message.{weight} AS {weight},
+                                       message.parent AS parent {comma_grp_m}
                                FROM {out_table} AS out_table INNER JOIN
                                        (
-                                               SELECT edge_table.{dest} AS id, 
x.val AS val,
-                                                       oldupdate.id AS parent
+                                       SELECT {edge_table}.{dest} AS id, 
x.{weight} AS {weight},
+                                               oldupdate.id AS parent 
{comma_grp_e}
+                                       FROM {oldupdate} AS oldupdate INNER JOIN
+                                               {edge_table}  ON
+                                                       ({edge_table}.{src} = 
oldupdate.id {checkg_eo})
+                                               INNER JOIN
+                                               (
+                                               SELECT {edge_table}.{dest} AS 
id,
+                                                       min(oldupdate.{weight} +
+                                                               
{edge_table}.{weight}) AS {weight} {comma_grp_e}
                                                FROM {oldupdate} AS oldupdate 
INNER JOIN
-                                                       {edge_table} AS 
edge_table ON
-                                                       (edge_table.{src} = 
oldupdate.id) INNER JOIN
-                                                       (
-                                                               SELECT 
edge_table.{dest} AS id,
-                                                                       
min(oldupdate.val + edge_table.{weight})
-                                                                       AS val
-                                                               FROM 
{oldupdate} AS oldupdate INNER JOIN
-                                                                       
{edge_table} AS edge_table ON
-                                                                       
(edge_table.{src}=oldupdate.id)
-                                                               GROUP BY 
edge_table.{dest}
-                                                       ) x ON 
(edge_table.{dest} = x.id)
-                                               WHERE ABS(oldupdate.val + 
edge_table.{weight} - x.val)
-                                                       < {EPSILON}
-                                       ) AS message ON (message.id = 
out_table.{vertex_id})
-                               WHERE message.val<out_table.{weight}
+                                                       {edge_table}  ON
+                                                       
({edge_table}.{src}=oldupdate.id {checkg_eo})
+                                               GROUP BY {edge_table}.{dest} 
{comma_grp_e}
+                                               ) x
+                                               ON ({edge_table}.{dest} = x.id 
{checkg_ex} )
+                                       WHERE ABS(oldupdate.{weight} + 
{edge_table}.{weight}
+                                                               - x.{weight}) < 
{EPSILON}
+                                       ) message
+                                       ON (message.id = out_table.{vertex_id} 
{checkg_om})
+                               WHERE message.{weight}<out_table.{weight}
                                """.format(**locals()))
 
-                       # If there are no updates, SSSP is finalized
-                       ret = plpy.execute(sql)
-                       if ret.nrows() == 0:
-                               break
+                       plpy.execute(sql)
 
-                       # Swap the update tables for the next iteration
+                       # Swap the update tables for the next iteration.
                        tmp = oldupdate
                        oldupdate = newupdate
                        newupdate = tmp
 
-               # Bellman-Ford should converge in |V|-1 iterations.
+               plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate))
+               # The algorithm should converge in less than |V| iterations.
+               # Otherwise there is a negative cycle in the graph.
                if i == v_cnt:
-                       plpy.execute("DROP TABLE IF EXISTS 
{out_table}".format(**locals()))
-                       plpy.error("Graph SSSP: Detected a negative cycle in 
the graph.")
-
-               m4_ifdef(<!__HAWQ__!>,
-                       plpy.execute("DROP TABLE {temp_table} 
".format(**locals())), <!''!>)
+                       if grouping_cols is None:
+                               plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}".
+                                       format(out_table, out_table+"_summary", 
oldupdate))
+                               if is_hawq:
+                                       plpy.execute("DROP TABLE IF EXISTS 
{0}".format(temp_table))
+                               plpy.error("Graph SSSP: Detected a negative 
cycle in the graph.")
+
+                       # It is possible that not all groups has negative 
cycles.
+                       else:
+
+                               # gsql is the string created by collating 
grouping columns.
+                               # By looking at the oldupdate table we can see 
which groups
+                               # are in a negative cycle.
+
+                               negs = plpy.execute(
+                                       """ SELECT array_agg(DISTINCT 
({grouping_cols})) AS grp
+                                               FROM {oldupdate}
+                                       """.format(**locals()))[0]['grp']
+
+                               # Delete the groups with negative cycles from 
the output table.
+                               sql_del = """ DELETE FROM {out_table}
+                                       USING {oldupdate} AS oldupdate
+                                       WHERE {checkg_oo_sub}"""
+                               if is_hawq:
+                                       sql_del = """
+                                               TRUNCATE TABLE {temp_table};
+                                               INSERT INTO {temp_table}
+                                                       SELECT *
+                                                       FROM {out_table}
+                                                       WHERE NOT EXISTS(
+                                                               SELECT 1
+                                                               FROM 
{oldupdate} as oldupdate
+                                                               WHERE 
{checkg_oo_sub}
+                                                               );
+                                               DROP TABLE {out_table};
+                                               ALTER TABLE {temp_table} RENAME 
TO {out_table};"""
+
+                               plpy.execute(sql_del.format(**locals()))
+
+                               # If every group has a negative cycle,
+                               # drop the output table as well.
+                               if table_is_empty(out_table):
+                                       plpy.execute("DROP TABLE IF EXISTS 
{0},{1}".
+                                               
format(out_table,out_table+"_summary"))
+
+                               plpy.warning(
+                                       """Graph SSSP: Detected a negative 
cycle in the """ +
+                                       """sub-graphs of following groups: 
{0}.""".
+                                       format(str(negs)[1:-1]))
+
+               plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate))
+               if is_hawq:
+                       plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
+                               format(**locals()))
 
        return None
 
-def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
+def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
+       **kwargs):
        """
-       Helper function that can be used to get the shortest path for a vertex
+    Helper function that can be used to get the shortest path for a vertex
     Args:
-       @param source_table     Name of the table that contains the SSSP output.
-        @param out_table       The vertex that will be the destination of the
-                                       desired path.
+        @param sssp_table   Name of the table that contains the SSSP output.
+        @param dest_vertex  The vertex that will be the destination of the
+                            desired path.
+        @param path_table   Name of the output table that contains the path.
        """
+       with MinWarning("warning"):
+               _validate_get_path(sssp_table, dest_vertex, path_table)
 
-       validate_get_path(sssp_table, dest_vertex)
-       cur = dest_vertex
-       cols = get_cols(sssp_table)
-       id = cols[0]
-       ret = [dest_vertex]
-       plan_name = unique_string(desp='plan')
-
-       # Follow the 'parent' chain until you reach the source.
-       # We don't need to know what the source is since it is the only vertex 
with
-       # itself as its parent
-       plpy.execute(""" PREPARE {plan_name} (int) AS
-               SELECT parent FROM {sssp_table} WHERE {id} = $1 LIMIT 1
-               """.format(**locals()))
-       sql = "EXECUTE {plan_name} ({cur})"
-       parent = plpy.execute(sql.format(**locals()))
+               temp1_name = unique_string(desp='temp1')
+               temp2_name = unique_string(desp='temp2')
 
-       if parent.nrows() == 0:
-               plpy.error(
-                       "Graph SSSP: Vertex {0} is not present in the sssp 
table {1}".
-                       format(dest_vertex,sssp_table))
-
-       while 1:
-               parent = parent[0]['parent']
-               if parent == cur:
-                       ret.reverse()
-                       return ret
-               else:
-                       ret.append(parent)
-                       cur = parent
-               parent = plpy.execute(sql.format(**locals()))
+               select_grps = ""
+               check_grps_t1 = ""
+               check_grps_t2 = ""
+               check_grps_pt1 = ""
+               check_grps_pt2 = ""
+               checkg_po = ""
+               grp_comma = ""
+               tmp = ""
+
+               summary = plpy.execute("SELECT * FROM 
{0}_summary".format(sssp_table))
+               vertex_id = summary[0]['vertex_id']
+               source_vertex = summary[0]['source_vertex']
+
+               if vertex_id == "NULL":
+                       vertex_id = "id"
+
+               grouping_cols = summary[0]['grouping_cols']
+               if grouping_cols == "NULL":
+                       grouping_cols = None
+
+               if grouping_cols is not None:
+                       glist = split_quoted_delimited_str(grouping_cols)
+                       select_grps = _grp_from_table(sssp_table,glist) + " , "
+                       check_grps_t1 = " AND " + _check_groups(
+                               sssp_table,temp1_name,glist)
+                       check_grps_t2 = " AND " + _check_groups(
+                               sssp_table,temp2_name,glist)
+
+                       checkg_po = " WHERE " + _check_groups(
+                               path_table,"oldupdate",glist)
+                       grp_comma = grouping_cols + " , "
+
+               if source_vertex == dest_vertex:
+                       plpy.execute("""
+                               CREATE TABLE {path_table} AS
+                               SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] 
AS path
+                               FROM {sssp_table} WHERE {vertex_id} = 
{dest_vertex}
+                               """.format(**locals()))
+                       return
+
+               plpy.execute( "DROP TABLE IF EXISTS {0},{1}".
+                       format(temp1_name,temp2_name));
+               out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS
+                               SELECT {grp_comma} {sssp_table}.parent AS 
{vertex_id},
+                                       ARRAY[{dest_vertex}] AS path
+                               FROM {sssp_table}
+                               WHERE {vertex_id} = {dest_vertex}
+                                       AND {sssp_table}.parent IS NOT NULL
+                       """.format(**locals()))
+
+               plpy.execute("""
+                       CREATE TEMP TABLE {temp2_name} AS
+                               SELECT * FROM {temp1_name} LIMIT 0
+                       """.format(**locals()))
+
+               # Follow the 'parent' chain until you reach the source.
+               while out.nrows() > 0:
+
+                       plpy.execute("TRUNCATE TABLE 
{temp2_name}".format(**locals()))
+                       # If the vertex id is not the source vertex,
+                       # Add it to the path and move to its parent
+                       out = plpy.execute(
+                               """ INSERT INTO {temp2_name}
+                               SELECT {select_grps} {sssp_table}.parent AS 
{vertex_id},
+                                       {sssp_table}.{vertex_id} || 
{temp1_name}.path AS path
+                               FROM {sssp_table} INNER JOIN {temp1_name} ON
+                                       ({sssp_table}.{vertex_id} = 
{temp1_name}.{vertex_id}
+                                               {check_grps_t1})
+                               WHERE {source_vertex} <> 
{sssp_table}.{vertex_id}
+                               """.format(**locals()))
+
+                       tmp = temp2_name
+                       temp2_name = temp1_name
+                       temp1_name = tmp
+
+                       tmp = check_grps_t1
+                       check_grps_t1 = check_grps_t2
+                       check_grps_t2 = tmp
+
+               # Add the source vertex to the beginning of every path and
+               # add the empty arrays for the groups that don't have a path to 
reach
+               # the destination vertex
+               plpy.execute("""
+                       CREATE TABLE {path_table} AS
+                       SELECT {grp_comma} {source_vertex} || path AS path
+                       FROM {temp2_name}
+                       UNION
+                       SELECT {grp_comma} '{{}}'::INT[] AS path
+                       FROM {sssp_table}
+                       WHERE {vertex_id} = {dest_vertex}
+                               AND {sssp_table}.parent IS NULL
+                       """.format(**locals()))
+
+               out = plpy.execute("SELECT 1 FROM {0} LIMIT 
1".format(path_table))
+
+               if out.nrows() == 0:
+                       plpy.error(
+                               "Graph SSSP: Vertex {0} is not present in the 
SSSP table {1}".
+                               format(dest_vertex,sssp_table))
+
+               plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}".
+                       format(**locals()))
 
        return None
 
-def validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
-       source_vertex, out_table, **kwargs):
+
+def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params,
+       source_vertex, out_table, glist, **kwargs):
 
        validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
                out_table,'SSSP')
 
        _assert(isinstance(source_vertex,int),
-               """Graph SSSP: Source vertex {source_vertex} has to be an 
integer """.
+               """Graph SSSP: Source vertex {source_vertex} has to be an 
integer.""".
                format(**locals()))
        src_exists = plpy.execute("""
                SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
@@ -300,8 +593,8 @@ def validate_sssp(vertex_table, vertex_id, edge_table, 
edge_params,
 
        if src_exists.nrows() == 0:
                plpy.error(
-                       """Graph SSSP: Source vertex {source_vertex} is not 
present in the
-                       vertex table {vertex_table} """.format(**locals()))
+                       """Graph SSSP: Source vertex {source_vertex} is not 
present in the vertex table {vertex_table}.""".
+                       format(**locals()))
 
        vt_error = plpy.execute(
                """ SELECT {vertex_id}
@@ -312,12 +605,20 @@ def validate_sssp(vertex_table, vertex_id, edge_table, 
edge_params,
 
        if vt_error.nrows() != 0:
                plpy.error(
-                       """Graph SSSP: Source vertex table {vertex_table}
-                       contains duplicate vertex id's """.format(**locals()))
+                       """Graph SSSP: Source vertex table {vertex_table} 
contains duplicate vertex id's.""".
+                       format(**locals()))
+
+       _assert(not table_exists(out_table+"_summary"),
+               "Graph SSSP: Output summary table already exists!")
+
+       if glist is not None:
+               _assert(columns_exist_in_table(edge_table, glist),
+                       """Graph SSSP: Not all columns from {glist} are present 
in edge table ({edge_table}).""".
+                       format(**locals()))
 
        return None
 
-def validate_get_path(sssp_table, dest_vertex, **kwargs):
+def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs):
 
        _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''),
                "Graph SSSP: Invalid SSSP table name!")
@@ -326,21 +627,31 @@ def validate_get_path(sssp_table, dest_vertex, **kwargs):
        _assert(not table_is_empty(sssp_table),
                "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table))
 
+       summary = sssp_table+"_summary"
+       _assert(table_exists(summary),
+               "Graph SSSP: SSSP summary table ({0}) is 
missing!".format(summary))
+       _assert(not table_is_empty(summary),
+               "Graph SSSP: SSSP summary table ({0}) is 
empty!".format(summary))
+
+       _assert(not table_exists(path_table),
+               "Graph SSSP: Output path table already exists!")
+
+       return None
 
 def graph_sssp_help(schema_madlib, message, **kwargs):
-    """
-    Help function for graph_sssp and graph_sssp_get_path
+       """
+       Help function for graph_sssp and graph_sssp_get_path
 
-    Args:
-        @param schema_madlib
-        @param message: string, Help message string
-        @param kwargs
+       Args:
+               @param schema_madlib
+               @param message: string, Help message string
+               @param kwargs
 
-    Returns:
-        String. Help/usage information
-    """
-    if not message:
-        help_string = """
+       Returns:
+           String. Help/usage information
+       """
+       if not message:
+               help_string = """
 -----------------------------------------------------------------------
                             SUMMARY
 -----------------------------------------------------------------------
@@ -352,41 +663,120 @@ weights of its constituent edges is minimized.
 For more details on function usage:
     SELECT {schema_madlib}.graph_sssp('usage')
             """
-    elif message in ['usage', 'help', '?']:
-        help_string = """
+       elif message.lower() in ['usage', 'help', '?']:
+               help_string = """
 {graph_usage}
 
 To retrieve the path for a specific vertex:
 
  SELECT {schema_madlib}.graph_sssp_get_path(
     sssp_table TEXT, -- Name of the table that contains the SSSP output.
-    dest_vertex        INT   -- The vertex that will be the destination of the
-                 -- desired path.
+    dest_vertex        INT,  -- The vertex that will be the destination of the
+                      -- desired path.
+    path_table  TEXT  -- Name of the output table that contains the path.
 );
 
 ----------------------------------------------------------------------------
                             OUTPUT
 ----------------------------------------------------------------------------
-The output table ('out_table' above) will contain a row for every vertex from
-vertex_table and have the following columns:
-
-vertex_id      : The id for the destination. Will use the input parameter
-               (vertex_id) for column naming.
-weight                 : The total weight of the shortest path from the source 
vertex
-               to this particular vertex. Will use the input parameter (weight)
-               for column naming.
-parent                 : The parent of this vertex in the shortest path from 
source.
-               Will use "parent" for column naming.
-
-The graph_sssp_get_path function will return an INT array that contains the
-shortest path from the initial source vertex to the desired destination vertex.
+The output of SSSP ('out_table' above) contains a row for every vertex of
+every group and have the following columns (in addition to the grouping
+columns):
+  - vertex_id : The id for the destination. Will use the input parameter
+                'vertex_id' for column naming.
+  - weight    : The total weight of the shortest path from the source vertex
+              to this particular vertex.
+              Will use the input parameter 'weight' for column naming.
+  - parent    : The parent of this vertex in the shortest path from source.
+              Will use 'parent' for column naming.
+
+The output of graph_sssp_get_path ('path_table' above) contains a row for
+every group and has the following columns:
+  - grouping_cols : The grouping columns given in the creation of the SSSP
+                  table. If there are no grouping columns, these columns
+                  will not exist and the table will have a single row.
+  - path (ARRAY)  : The shortest path from the source vertex (as specified
+                  in the SSSP execution) to the destination vertex.
+"""
+       elif message.lower() in ("example", "examples"):
+               help_string = """
+----------------------------------------------------------------------------
+                                EXAMPLES
+----------------------------------------------------------------------------
+-- Create a graph, represented as vertex and edge tables.
+DROP TABLE IF EXISTS vertex,edge,out,out_summary,out_path;
+CREATE TABLE vertex(
+        id INTEGER
+        );
+CREATE TABLE edge(
+        src INTEGER,
+        dest INTEGER,
+        weight DOUBLE PRECISION
+);
+
+INSERT INTO vertex VALUES
+(0),
+(1),
+(2),
+(3),
+(4),
+(5),
+(6),
+(7)
+;
+INSERT INTO edge VALUES
+(0, 1, 1),
+(0, 2, 1),
+(0, 4, 10),
+(1, 2, 2),
+(1, 3, 10),
+(2, 3, 1),
+(2, 5, 1),
+(2, 6, 3),
+(3, 0, 1),
+(4, 0, -2),
+(5, 6, 1),
+(6, 7, 1)
+;
+
+-- Compute the SSSP:
+DROP TABLE IF EXISTS pagerank_out;
+SELECT madlib.graph_sssp(
+       'vertex',                            -- Vertex table
+       'id',                                -- Vertix id column
+       'edge',                              -- Edge table
+       'src=src, dest=dest, weight=weight', -- Comma delimted string of edge 
arguments
+        0,                                  -- The source vertex
+       'out'                                -- Output table of SSSP
+);
+-- View the SSSP costs for every vertex:
+SELECT * FROM out ORDER BY id;
+
+-- View the actual shortest path for a vertex:
+SELECT graph_sssp_get_path('out',5,'out_path');
+SELECT * FROM out_path;
+
+-- Create a graph with 2 groups:
+DROP TABLE IF EXISTS edge_gr;
+CREATE TABLE edge_gr AS
+(
+  SELECT *, 0 AS grp FROM edge
+  UNION
+  SELECT *, 1 AS grp FROM edge WHERE src < 6 AND dest < 6
+);
+INSERT INTO edge_gr VALUES
+(4,5,-20,1);
+
+-- Find SSSP for all groups:
+DROP TABLE IF EXISTS out_gr, out_gr_summary;
+SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
 """
-    else:
-        help_string = "No such option. Use {schema_madlib}.graph_sssp()"
+       else:
+               help_string = "No such option. Use {schema_madlib}.graph_sssp()"
 
-    return help_string.format(schema_madlib=schema_madlib,
-       graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
+       return help_string.format(schema_madlib=schema_madlib,
+               graph_usage=get_graph_usage(schema_madlib, 'graph_sssp',
     """source_vertex INT,  -- The source vertex id for the algorithm to start.
-    out_table     TEXT  -- Name of the table to store the result of SSSP."""))
+    out_table     TEXT, -- Name of the table to store the result of SSSP.
+    grouping_cols TEXT  -- The list of grouping columns."""))
 # ---------------------------------------------------------------------
-

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/graph/sssp.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/sssp.sql_in 
b/src/ports/postgres/modules/graph/sssp.sql_in
index 7f89823..be433dc 100644
--- a/src/ports/postgres/modules/graph/sssp.sql_in
+++ b/src/ports/postgres/modules/graph/sssp.sql_in
@@ -55,7 +55,8 @@ graph_sssp( vertex_table,
             edge_table,
             edge_args,
             source_vertex,
-            out_table
+            out_table,
+            grouping_cols
           )
 </pre>
 
@@ -89,12 +90,18 @@ exist in the 'vertex_id' column of 'vertex_table'.</dd>
 
 <dt>out_table</dt>
 <dd>TEXT. Name of the table to store the result of SSSP.
-It will contain a row for every vertex from 'vertex_table' and have
-the following columns:
+It contains a row for every vertex of every group and have
+the following columns (in addition to the grouping columns):
   - vertex_id : The id for the destination. Will use the input parameter 
'vertex_id' for column naming.
   - weight : The total weight of the shortest path from the source vertex to 
this particular vertex.
-  Will use the input parameter (weight) for column naming.
-  - parent : The parent of this vertex in the shortest path from source. Will 
use 'parent' for column naming.</dd>
+  Will use the input parameter 'weight' for column naming.
+  - parent : The parent of this vertex in the shortest path from source. Will 
use 'parent' for column naming.
+
+A summary table named <out_table>_summary is also created. This is an internal 
table that keeps a record of the input parameters and is used by the path 
function described below.
+</dd>
+
+<dt>grouping_cols</dt>
+<dd>TEXT, default = NULL. List of columns used to group the input into 
discrete subgraphs. These columns must exist in the edge table. When this value 
is null, no grouping is used and a single SSSP result is generated. </dd>
 </dl>
 
 @par Path Retrieval
@@ -103,9 +110,10 @@ The path retrieval function returns the shortest path from 
the
 source vertex to a specified desination vertex.
 
 <pre class="syntax">
-graph_sssp( sssp_table,
-            dest_vertex
-          )
+graph_sssp_get_path( sssp_table,
+                     dest_vertex,
+                     path_table
+                    )
 </pre>
 
 \b Arguments
@@ -115,6 +123,14 @@ graph_sssp( sssp_table,
 
 <dt>dest_vertex</dt>
 <dd>INTEGER. The vertex that will be the destination of the desired path.</dd>
+
+<dt>path_table</dt>
+<dd>TEXT. Name of the output table that contains the path.
+It contains a row for every group and has the following columns:
+  - grouping_cols : The grouping columns given in the creation of the SSSP 
table. If there are no grouping columns, these columns will not exist and the 
table will have a single row.
+  - path (ARRAY) : The shortest path from the source vertex (as specified in 
the SSSP execution) to the destination vertex.
+</dd>
+
 </dl>
 
 @anchor notes
@@ -167,7 +183,7 @@ INSERT INTO edge VALUES
 
 -# Calculate the shortest paths from vertex 0:
 <pre class="syntax">
-DROP TABLE IF EXISTS out;
+DROP TABLE IF EXISTS out, out_summary;
 SELECT madlib.graph_sssp(
                          'vertex',      -- Vertex table
                          NULL,          -- Vertix id column (NULL means use 
default naming)
@@ -191,14 +207,16 @@ SELECT * FROM out ORDER BY id;
 (8 rows)
 </pre>
 
--# Get the shortest path to vertex 6:
+-# Get the shortest path to vertex 5:
 <pre class="syntax">
-SELECT madlib.graph_sssp_get_path('out',6) AS spath;
+DROP TABLE IF EXISTS out_path;
+SELECT madlib.graph_sssp_get_path('out',5,'out_path');
+SELECT * FROM out_path;
 </pre>
 <pre class="result">
-   spath
-\-----------
- {0,2,5,6}
+  path
+\---------
+ {0,2,5}
 </pre>
 
 -# Now let's do a similar example except using
@@ -212,10 +230,10 @@ CREATE TABLE edge_alt AS SELECT src AS e_src, dest, 
weight AS e_weight FROM edge
 
 -# Get the shortest path from vertex 1:
 <pre class="syntax">
-DROP TABLE IF EXISTS out_alt;
+DROP TABLE IF EXISTS out_alt, out_alt_summary;
 SELECT madlib.graph_sssp(
                          'vertex_alt',                  -- Vertex table
-                         'v_id',                        -- Vertix id column 
(NULL means use default naming)
+                         'v_id',                        -- Vertex id column 
(NULL means use default naming)
                          'edge_alt',                    -- Edge table
                          'src=e_src, weight=e_weight',  -- Edge arguments 
(NULL means use default naming)
                          1,                             -- Source vertex for 
path calculation
@@ -236,6 +254,65 @@ SELECT * FROM out_alt ORDER BY v_id;
 (8 rows)
 </pre>
 
+-# Create a graph with 2 groups:
+<pre class="syntax">
+DROP TABLE IF EXISTS edge_gr;
+CREATE TABLE edge_gr AS
+(
+  SELECT *, 0 AS grp FROM edge
+  UNION
+  SELECT *, 1 AS grp FROM edge WHERE src < 6 AND dest < 6
+);
+INSERT INTO edge_gr VALUES
+(4,5,-20,1);
+</pre>
+
+-# Find SSSP for all groups
+<pre class="syntax">
+DROP TABLE IF EXISTS out_gr, out_gr_summary;
+SELECT madlib.graph_sssp(
+                         'vertex',      -- Vertex table
+                         NULL,          -- Vertex id column (NULL means use 
default naming)
+                         'edge_gr',     -- Edge table
+                         NULL,          -- Edge arguments (NULL means use 
default naming)
+                         0,             -- Source vertex for path calculation
+                         'out_gr',      -- Output table of shortest paths
+                         'grp'          -- Grouping columns
+);
+SELECT * FROM out_gr ORDER BY grp,id;
+</pre>
+<pre class="result">
+ grp | id | weight | parent
+-----+----+--------+--------
+   0 |  0 |      0 |      0
+   0 |  1 |      1 |      0
+   0 |  2 |      1 |      0
+   0 |  3 |      2 |      2
+   0 |  4 |     10 |      0
+   0 |  5 |      2 |      2
+   0 |  6 |      3 |      5
+   0 |  7 |      4 |      6
+   1 |  0 |      0 |      0
+   1 |  1 |      1 |      0
+   1 |  2 |      1 |      0
+   1 |  3 |      2 |      2
+   1 |  4 |     10 |      0
+   1 |  5 |    -10 |      4
+</pre>
+
+-# Find the path to vertex 5 in every group
+<pre class="syntax">
+DROP TABLE IF EXISTS out_gr_path;
+SELECT madlib.graph_sssp_get_path('out_gr',5,'out_gr_path');
+SELECT * FROM out_gr_path ORDER BY grp;
+</pre>
+<pre class="result">
+ grp |  path
+-----+---------
+   0 | {0,2,5}
+   1 | {0,4,5}
+</pre>
+
 @anchor literature
 @par Literature
 
@@ -253,21 +330,36 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp(
     edge_table              TEXT,
     edge_args               TEXT,
     source_vertex           INT,
-    out_table               TEXT
+    out_table               TEXT,
+    grouping_cols           TEXT
 
 ) RETURNS VOID AS $$
     PythonFunction(graph, sssp, graph_sssp)
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
 -------------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp(
+    vertex_table            TEXT,
+    vertex_id               TEXT,
+    edge_table              TEXT,
+    edge_args               TEXT,
+    source_vertex           INT,
+    out_table               TEXT
+
+) RETURNS VOID AS $$
+     SELECT MADLIB_SCHEMA.graph_sssp($1, $2, $3, $4, $5, $6, NULL);
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
+-------------------------------------------------------------------------
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.graph_sssp_get_path(
     sssp_table             TEXT,
-    dest_vertex            INT
+    dest_vertex            INT,
+    path_table             TEXT
 
-) RETURNS INT[] AS $$
+) RETURNS VOID AS $$
     PythonFunction(graph, sssp, graph_sssp_get_path)
 $$ LANGUAGE plpythonu VOLATILE
-m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `');
+m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `');
 -------------------------------------------------------------------------
 
 -- Online help

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/graph/test/sssp.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/graph/test/sssp.sql_in 
b/src/ports/postgres/modules/graph/test/sssp.sql_in
index e2342c5..c3545c2 100644
--- a/src/ports/postgres/modules/graph/test/sssp.sql_in
+++ b/src/ports/postgres/modules/graph/test/sssp.sql_in
@@ -20,7 +20,10 @@
  *//* ----------------------------------------------------------------------- 
*/
 
 
-DROP TABLE IF EXISTS vertex,edge,out,vertex_alt,edge_alt,out_alt;
+DROP TABLE IF EXISTS vertex,edge,out,out_summary,out_path,
+       vertex_alt,edge_alt,out_alt,out_alot_summary,
+       edge_gr,out_gr,out_gr_summary,out_gr_path,
+       edge_gr2, out_gr2, out_gr2_summary;
 
 
 CREATE TABLE vertex(
@@ -30,7 +33,7 @@ CREATE TABLE vertex(
 CREATE TABLE edge(
                   src INTEGER,
                   dest INTEGER,
-                  weight INTEGER
+                  weight DOUBLE PRECISION
                 );
 
 INSERT INTO vertex VALUES
@@ -62,17 +65,69 @@ SELECT graph_sssp('vertex',NULL,'edge',NULL,0,'out');
 
 SELECT * FROM out;
 
-SELECT assert(weight = 3, 'Wrong output in graph (SSSP)') FROM out WHERE id = 
6;
-SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)') FROM out WHERE id = 
6;
+SELECT assert(weight = 3, 'Wrong output in graph (SSSP)')
+       FROM out WHERE id = 6;
+SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)')
+       FROM out WHERE id = 6;
 
-SELECT graph_sssp_get_path('out',6);
+SELECT graph_sssp_get_path('out',6,'out_path');
 
-CREATE TABLE vertex_alt AS SELECT id AS v_id FROM vertex;
-CREATE TABLE edge_alt AS SELECT src AS e_src, dest, weight AS e_weight FROM 
edge;
+CREATE TABLE vertex_alt AS SELECT id AS v_id
+       FROM vertex;
+CREATE TABLE edge_alt AS SELECT src AS e_src, dest, weight AS e_weight
+       FROM edge;
 
-SELECT graph_sssp('vertex_alt','v_id','edge_alt','src=e_src, 
weight=e_weight',1,'out_alt');
+SELECT graph_sssp('vertex_alt','v_id','edge_alt','src=e_src, weight=e_weight'
+       ,1,'out_alt');
 
 SELECT * FROM out_alt;
 
-SELECT assert(e_weight = 4, 'Wrong output in graph (SSSP)') FROM out_alt WHERE 
v_id = 6;
-SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)') FROM out_alt WHERE 
v_id = 6;
+SELECT assert(e_weight = 4, 'Wrong output in graph (SSSP)')
+       FROM out_alt WHERE v_id = 6;
+SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)')
+       FROM out_alt WHERE v_id = 6;
+
+CREATE TABLE edge_gr AS
+(      SELECT *, 0 AS grp FROM edge
+       UNION
+       SELECT *, 1 AS grp FROM edge WHERE src < 6 AND dest < 6
+       UNION
+       SELECT *, 2 AS grp FROM edge WHERE src < 6 AND dest < 6
+);
+
+INSERT INTO edge_gr VALUES
+(7,NULL,NULL,1),
+(4,0,-20,2);
+
+SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp');
+
+SELECT assert(weight = 3, 'Wrong output in graph (SSSP)')
+       FROM out_gr WHERE id = 6 AND grp = 0;
+SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)')
+       FROM out_gr WHERE id = 6 AND grp = 0;
+
+SELECT assert(weight = 2, 'Wrong output in graph (SSSP)')
+       FROM out_gr WHERE id = 5 AND grp = 1;
+SELECT assert(parent = 2, 'Wrong parent in graph (SSSP)')
+       FROM out_gr WHERE id = 5 AND grp = 1;
+
+SELECT assert(weight = 'Infinity', 'Wrong output in graph (SSSP)')
+       FROM out_gr WHERE id = 7 AND grp = 1;
+
+SELECT graph_sssp_get_path('out_gr',5,'out_gr_path');
+
+CREATE TABLE edge_gr2 AS
+(      SELECT *, 0 AS grp1, 0 AS grp2 FROM edge
+       UNION
+       SELECT *, 1 AS grp1, 0 AS grp2 FROM edge WHERE src < 6 AND dest < 6
+       UNION
+       SELECT *, 1 AS grp1, 1 AS grp2 FROM edge WHERE src < 6 AND dest < 6
+);
+
+SELECT graph_sssp('vertex',NULL,'edge_gr2',NULL,0,'out_gr2','grp1,grp2');
+
+
+SELECT assert(weight = 3, 'Wrong output in graph (SSSP)')
+       FROM out_gr2 WHERE id = 6 AND grp1 = 0 AND grp2 = 0;
+SELECT assert(parent = 5, 'Wrong parent in graph (SSSP)')
+       FROM out_gr2 WHERE id = 6 AND grp1 = 0 AND grp2 = 0;

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/pca/pca.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/pca/pca.py_in 
b/src/ports/postgres/modules/pca/pca.py_in
index 196c558..680e9f6 100644
--- a/src/ports/postgres/modules/pca/pca.py_in
+++ b/src/ports/postgres/modules/pca/pca.py_in
@@ -144,16 +144,16 @@ def pca_wrap(schema_madlib, source_table, pc_table, 
row_id,
         )
         """.format(pc_table=pc_table, 
grouping_cols_clause=grouping_cols_clause))
     pc_table_mean = add_postfix(pc_table, "_mean")
+    plpy.execute("DROP TABLE IF EXISTS {0}".format(pc_table_mean))
     plpy.execute("""
-        DROP TABLE IF EXISTS {pc_table_mean};
         CREATE TABLE {pc_table_mean} (
             column_mean     double precision[]
             {grouping_cols_clause}
         )
         """.format(pc_table_mean=pc_table_mean, 
grouping_cols_clause=grouping_cols_clause))
     if result_summary_table:
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(result_summary_table))
         plpy.execute("""
-                DROP TABLE IF EXISTS {0};
                 CREATE TABLE {0} (
                 rows_used               INTEGER,
                 "exec_time (ms)"        numeric,
@@ -947,7 +947,7 @@ SELECT {schema_madlib}.pca_train( 'mat',
           'id',
           3
     );
-    
+
 SELECT * FROM result_table ORDER BY row_id;
 
 DROP TABLE IF EXISTS mat_group;

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/stats/test/chi2_test.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/stats/test/chi2_test.sql_in 
b/src/ports/postgres/modules/stats/test/chi2_test.sql_in
index c49d996..62648a0 100644
--- a/src/ports/postgres/modules/stats/test/chi2_test.sql_in
+++ b/src/ports/postgres/modules/stats/test/chi2_test.sql_in
@@ -58,7 +58,7 @@ CREATE TABLE chi2_independence_est_1 AS
 SELECT (chi2_gof_test(observed, expected, deg_freedom)).*
 FROM (
     SELECT
-        observed,
+        id_x,id_y,observed,
         sum(observed) OVER (PARTITION BY id_x)::DOUBLE PRECISION
             * sum(observed) OVER (PARTITION BY id_y) AS expected
     FROM chi2_test_friendly_unpivoted

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/8faf6226/src/ports/postgres/modules/validation/internal/cross_validation.py_in
----------------------------------------------------------------------
diff --git 
a/src/ports/postgres/modules/validation/internal/cross_validation.py_in 
b/src/ports/postgres/modules/validation/internal/cross_validation.py_in
index c1b2561..11cde2f 100644
--- a/src/ports/postgres/modules/validation/internal/cross_validation.py_in
+++ b/src/ports/postgres/modules/validation/internal/cross_validation.py_in
@@ -200,9 +200,9 @@ def _cv_copy_data(rel_origin, dependent_varname,
     """
     """
     target_col, features_col = 'y', 'x'
+    plpy.execute("drop table if exists {0}".format(rel_copied))
     plpy.execute("""
         select setseed(0.5);
-        drop table if exists {rel_copied};
         create temp table {rel_copied} as
             select
                 row_number() over (order by random()) as {random_id},
@@ -233,15 +233,15 @@ def _cv_split_data(rel_source, col_data, col_id, row_num,
     # which corresponds to rows outside of [start_row, end_row).
     # Extract the validation part of data,
     # which corresponds to rows inside of [start_row, end_row).
+    plpy.execute("drop view if exists {rel_train}".format(**kwargs))
     plpy.execute("""
-        drop view if exists {rel_train};
         create temp view {rel_train} as
             select {col_id}, {col_string} from {rel_source}
             where {col_id} < {start_row}
                  or {col_id} >= {end_row}
         """.format(**kwargs))
+    plpy.execute("drop view if exists {rel_valid}".format(**kwargs))
     plpy.execute("""
-        drop view if exists {rel_valid};
         create temp view {rel_valid} as
             select {col_id}, {col_string} from {rel_source}
             where {col_id} >= {start_row}

Reply via email to