Repository: incubator-madlib Updated Branches: refs/heads/master 8c9b955cd -> d487df3c4
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/sssp.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/sssp.py_in b/src/ports/postgres/modules/graph/sssp.py_in index 4839d2d..93497c4 100644 --- a/src/ports/postgres/modules/graph/sssp.py_in +++ b/src/ports/postgres/modules/graph/sssp.py_in @@ -33,21 +33,22 @@ from graph_utils import get_graph_usage from graph_utils import _grp_from_table from graph_utils import _check_groups from utilities.control import MinWarning + from utilities.utilities import _assert from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string from utilities.utilities import split_quoted_delimited_str +from utilities.utilities import is_platform_pg, is_platform_hawq + from utilities.validate_args import table_exists from utilities.validate_args import columns_exist_in_table from utilities.validate_args import table_is_empty from utilities.validate_args import get_expr_type -m4_changequote(`<!', `!>') def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table, - edge_args, source_vertex, out_table, grouping_cols, **kwargs): - - """ + edge_args, source_vertex, out_table, grouping_cols, **kwargs): + """ Single source shortest path function for graphs using the Bellman-Ford algorhtm [1]. Args: @@ -63,383 +64,382 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table, [1] https://en.wikipedia.org/wiki/Bellman-Ford_algorithm """ - with MinWarning("warning"): - - INT_MAX = 2147483647 - INFINITY = "'Infinity'" - EPSILON = 0.000001 - - message = unique_string(desp='message') - - oldupdate = unique_string(desp='oldupdate') - newupdate = unique_string(desp='newupdate') - - params_types = {'src': str, 'dest': str, 'weight': str} - default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} - edge_params = extract_keyvalue_params(edge_args, - params_types, - default_args) - - # Prepare the input for recording in the summary table - if vertex_id is None: - v_st= "NULL" - vertex_id = "id" - else: - v_st = vertex_id - if edge_args is None: - e_st = "NULL" - else: - e_st = edge_args - if grouping_cols is None: - g_st = "NULL" - glist = None - else: - g_st = grouping_cols - glist = split_quoted_delimited_str(grouping_cols) - - src = edge_params["src"] - dest = edge_params["dest"] - weight = edge_params["weight"] - - distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) - local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY (id)"!>) - - is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) - _validate_sssp(vertex_table, vertex_id, edge_table, - edge_params, source_vertex, out_table, glist) - - plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format( - message,oldupdate,newupdate)) - - # Initialize grouping related variables - comma_grp = "" - comma_grp_e = "" - comma_grp_m = "" - grp_comma = "" - checkg_oo = "" - checkg_eo = "" - checkg_ex = "" - checkg_om = "" - group_by = "" - - if grouping_cols is not None: - comma_grp = " , " + grouping_cols - group_by = " , " + _grp_from_table(edge_table,glist) - comma_grp_e = " , " + _grp_from_table(edge_table,glist) - comma_grp_m = " , " + _grp_from_table("message",glist) - grp_comma = grouping_cols + " , " - - checkg_oo_sub = _check_groups(out_table,"oldupdate",glist) - checkg_oo = " AND " + checkg_oo_sub - checkg_eo = " AND " + _check_groups(edge_table,"oldupdate",glist) - checkg_ex = " AND " + _check_groups(edge_table,"x",glist) - checkg_om = " AND " + _check_groups("out_table","message",glist) - - w_type = get_expr_type(weight,edge_table).lower() - init_w = INT_MAX - if w_type in ['real','double precision','float8']: - init_w = INFINITY - - # We keep a table of every vertex, the minimum cost to that destination - # seen so far and the parent to this vertex in the associated shortest - # path. This table will be updated throughout the execution. - plpy.execute( - """ CREATE TABLE {out_table} AS ( SELECT - {grp_comma} {src} AS {vertex_id}, {weight}, - {src} AS parent FROM {edge_table} LIMIT 0) - {distribution} """.format(**locals())) - - # We keep a summary table to keep track of the parameters used for this - # SSSP run. This table is used in the path finding function to eliminate - # the need for repetition. - plpy.execute( """ CREATE TABLE {out_table}_summary ( - vertex_table TEXT, - vertex_id TEXT, - edge_table TEXT, - edge_args TEXT, - source_vertex INTEGER, - out_table TEXT, - grouping_cols TEXT) - """.format(**locals())) - plpy.execute( """ INSERT INTO {out_table}_summary VALUES - ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', - {source_vertex}, '{out_table}', '{g_st}') - """.format(**locals())) - - # We keep 2 update tables and alternate them during the execution. - # This is necessary since we need to know which vertices are updated in - # the previous iteration to calculate the next set of updates. - plpy.execute( - """ CREATE TEMP TABLE {oldupdate} AS ( SELECT - {src} AS id, {weight}, - {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0) - {local_distribution} - """.format(**locals())) - plpy.execute( - """ CREATE TEMP TABLE {newupdate} AS ( SELECT - {src} AS id, {weight}, - {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0) - {local_distribution} - """.format(**locals())) - - # Since HAWQ does not allow us to update, we create a new table and - # rename at every iteration. - if is_hawq: - temp_table = unique_string(desp='temp') - sql =""" CREATE TABLE {temp_table} AS ( SELECT * FROM {out_table} ) - {distribution} """ - plpy.execute(sql.format(**locals())) - - # GPDB and HAWQ have distributed by clauses to help them with indexing. - # For Postgres we add the indices manually. - sql_index = m4_ifdef(<!__POSTGRESQL__!>, - <!""" CREATE INDEX ON {out_table} ({vertex_id}); - CREATE INDEX ON {oldupdate} (id); - CREATE INDEX ON {newupdate} (id); - """.format(**locals())!>, - <!''!>) - plpy.execute(sql_index) - - # The initialization step is quite different when grouping is involved - # since not every group (subgraph) will have the same set of vertices. - - # Example: - # Assume there are two grouping columns g1 and g2 - # g1 values are 0 and 1. g2 values are 5 and 6 - if grouping_cols is not None: - - distinct_grp_table = unique_string(desp='grp') - plpy.execute(""" DROP TABLE IF EXISTS {distinct_grp_table} """. - format(**locals())) - plpy.execute( """ CREATE TEMP TABLE {distinct_grp_table} AS - SELECT DISTINCT {grouping_cols} FROM {edge_table} """. - format(**locals())) - subq = unique_string(desp='subquery') - - checkg_ds_sub = _check_groups(distinct_grp_table,subq,glist) - grp_d_comma = _grp_from_table(distinct_grp_table,glist) +"," - - plpy.execute( - """ INSERT INTO {out_table} - SELECT {grp_d_comma} {vertex_id} AS {vertex_id}, - {init_w} AS {weight}, NULL::INT AS parent - FROM {distinct_grp_table} INNER JOIN - ( - SELECT {src} AS {vertex_id} {comma_grp} - FROM {edge_table} - UNION - SELECT {dest} AS {vertex_id} {comma_grp} - FROM {edge_table} - ) {subq} ON ({checkg_ds_sub}) - WHERE {vertex_id} IS NOT NULL - """.format(**locals())) - - plpy.execute( - """ INSERT INTO {oldupdate} - SELECT {source_vertex}, 0, {source_vertex}, - {grouping_cols} - FROM {distinct_grp_table} - """.format(**locals())) - - # The maximum number of vertices for any group. - # Used for determining negative cycles. - v_cnt = plpy.execute( - """ SELECT max(count) as max FROM ( - SELECT count({vertex_id}) AS count - FROM {out_table} - GROUP BY {grouping_cols}) x - """.format(**locals()))[0]['max'] - plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) - else: - plpy.execute( - """ INSERT INTO {out_table} - SELECT {vertex_id} AS {vertex_id}, - {init_w} AS {weight}, - NULL AS parent - FROM {vertex_table} - WHERE {vertex_id} IS NOT NULL - """.format(**locals())) - - # The source can be reached with 0 cost and it has itself as the - # parent. - plpy.execute( - """ INSERT INTO {oldupdate} - VALUES({source_vertex},0,{source_vertex}) - """.format(**locals())) - - v_cnt = plpy.execute( - """ SELECT count(*) FROM {vertex_table} - WHERE {vertex_id} IS NOT NULL - """.format(**locals()))[0]['count'] - - for i in range(0,v_cnt+1): - - # Apply the updates calculated in the last iteration. - if is_hawq: - sql = """ - TRUNCATE TABLE {temp_table}; - INSERT INTO {temp_table} - SELECT * - FROM {out_table} - WHERE NOT EXISTS ( - SELECT 1 - FROM {oldupdate} as oldupdate - WHERE {out_table}.{vertex_id} = oldupdate.id - {checkg_oo}) - UNION - SELECT {grp_comma} id, {weight}, parent FROM {oldupdate}; - """ - plpy.execute(sql.format(**locals())) - plpy.execute("DROP TABLE {0}".format(out_table)) - plpy.execute("ALTER TABLE {0} RENAME TO {1}". - format(temp_table,out_table)) - sql = """ CREATE TABLE {temp_table} AS ( - SELECT * FROM {out_table} LIMIT 0) - {distribution};""" - plpy.execute(sql.format(**locals())) - ret = plpy.execute("SELECT id FROM {0} LIMIT 1". - format(oldupdate)) - else: - sql = """ - UPDATE {out_table} SET - {weight}=oldupdate.{weight}, - parent=oldupdate.parent - FROM - {oldupdate} AS oldupdate - WHERE - {out_table}.{vertex_id}=oldupdate.id AND - {out_table}.{weight}>oldupdate.{weight} {checkg_oo} - """ - ret = plpy.execute(sql.format(**locals())) - - if ret.nrows() == 0: - break - - plpy.execute("TRUNCATE TABLE {0}".format(newupdate)) - - # 'oldupdate' table has the update info from the last iteration - - # Consider every edge that has an updated source - # From these edges: - # For every destination vertex, find the min total cost to reach. - # Note that, just calling an aggregate function with group by won't - # let us store the src field of the edge (needed for the parent). - # This is why we need the 'x'; it gives a list of destinations and - # associated min values. Using these values, we identify which edge - # is selected. - - # Since using '=' with floats is dangerous we use an epsilon value - # for comparison. - - # Once we have a list of edges and values (stores as 'message'), - # we check if these values are lower than the existing shortest - # path values. - - sql = (""" INSERT INTO {newupdate} - SELECT DISTINCT ON (message.id {comma_grp}) - message.id AS id, - message.{weight} AS {weight}, - message.parent AS parent {comma_grp_m} - FROM {out_table} AS out_table INNER JOIN - ( - SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight}, - oldupdate.id AS parent {comma_grp_e} - FROM {oldupdate} AS oldupdate INNER JOIN - {edge_table} ON - ({edge_table}.{src} = oldupdate.id {checkg_eo}) - INNER JOIN - ( - SELECT {edge_table}.{dest} AS id, - min(oldupdate.{weight} + - {edge_table}.{weight}) AS {weight} {comma_grp_e} - FROM {oldupdate} AS oldupdate INNER JOIN - {edge_table} ON - ({edge_table}.{src}=oldupdate.id {checkg_eo}) - GROUP BY {edge_table}.{dest} {comma_grp_e} - ) x - ON ({edge_table}.{dest} = x.id {checkg_ex} ) - WHERE ABS(oldupdate.{weight} + {edge_table}.{weight} - - x.{weight}) < {EPSILON} - ) message - ON (message.id = out_table.{vertex_id} {checkg_om}) - WHERE message.{weight}<out_table.{weight} - """.format(**locals())) - - plpy.execute(sql) - - # Swap the update tables for the next iteration. - tmp = oldupdate - oldupdate = newupdate - newupdate = tmp - - plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate)) - # The algorithm should converge in less than |V| iterations. - # Otherwise there is a negative cycle in the graph. - if i == v_cnt: - if grouping_cols is None: - plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}". - format(out_table, out_table+"_summary", oldupdate)) - if is_hawq: - plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table)) - plpy.error("Graph SSSP: Detected a negative cycle in the graph.") - - # It is possible that not all groups has negative cycles. - else: - - # negs is the string created by collating grouping columns. - # By looking at the oldupdate table we can see which groups - # are in a negative cycle. - - negs = plpy.execute( - """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp - FROM {oldupdate} - """.format(**locals()))[0]['grp'] - - # Delete the groups with negative cycles from the output table. - if is_hawq: - sql_del = """ - TRUNCATE TABLE {temp_table}; - INSERT INTO {temp_table} - SELECT * - FROM {out_table} - WHERE NOT EXISTS( - SELECT 1 - FROM {oldupdate} as oldupdate - WHERE {checkg_oo_sub} - );""" - plpy.execute(sql_del.format(**locals())) - plpy.execute("DROP TABLE {0}".format(out_table)) - plpy.execute("ALTER TABLE {0} RENAME TO {1}". - format(temp_table,out_table)) - else: - sql_del = """ DELETE FROM {out_table} - USING {oldupdate} AS oldupdate - WHERE {checkg_oo_sub}""" - plpy.execute(sql_del.format(**locals())) - - # If every group has a negative cycle, - # drop the output table as well. - if table_is_empty(out_table): - plpy.execute("DROP TABLE IF EXISTS {0},{1}". - format(out_table,out_table+"_summary")) - - plpy.warning( - """Graph SSSP: Detected a negative cycle in the """ + - """sub-graphs of following groups: {0}.""". - format(str(negs)[1:-1])) - - plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) - if is_hawq: - plpy.execute("DROP TABLE IF EXISTS {temp_table} ". - format(**locals())) - - return None + with MinWarning("warning"): + + INT_MAX = 2147483647 + INFINITY = "'Infinity'" + EPSILON = 0.000001 + + message = unique_string(desp='message') + + oldupdate = unique_string(desp='oldupdate') + newupdate = unique_string(desp='newupdate') + + params_types = {'src': str, 'dest': str, 'weight': str} + default_args = {'src': 'src', 'dest': 'dest', 'weight': 'weight'} + edge_params = extract_keyvalue_params(edge_args, + params_types, + default_args) + + # Prepare the input for recording in the summary table + if vertex_id is None: + v_st = "NULL" + vertex_id = "id" + else: + v_st = vertex_id + if edge_args is None: + e_st = "NULL" + else: + e_st = edge_args + if grouping_cols is None: + g_st = "NULL" + glist = None + else: + g_st = grouping_cols + glist = split_quoted_delimited_str(grouping_cols) + + src = edge_params["src"] + dest = edge_params["dest"] + weight = edge_params["weight"] + + distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id) + local_distribution = '' if is_platform_pg() else "DISTRIBUTED BY (id)" + + is_hawq = is_platform_hawq() + + _validate_sssp(vertex_table, vertex_id, edge_table, + edge_params, source_vertex, out_table, glist) + + plpy.execute(" DROP TABLE IF EXISTS {0},{1},{2}".format( + message, oldupdate, newupdate)) + + # Initialize grouping related variables + comma_grp = "" + comma_grp_e = "" + comma_grp_m = "" + grp_comma = "" + checkg_oo = "" + checkg_eo = "" + checkg_ex = "" + checkg_om = "" + group_by = "" + + if grouping_cols is not None: + comma_grp = " , " + grouping_cols + group_by = " , " + _grp_from_table(edge_table, glist) + comma_grp_e = " , " + _grp_from_table(edge_table, glist) + comma_grp_m = " , " + _grp_from_table("message", glist) + grp_comma = grouping_cols + " , " + + checkg_oo_sub = _check_groups(out_table, "oldupdate", glist) + checkg_oo = " AND " + checkg_oo_sub + checkg_eo = " AND " + _check_groups(edge_table, "oldupdate", glist) + checkg_ex = " AND " + _check_groups(edge_table, "x", glist) + checkg_om = " AND " + _check_groups("out_table", "message", glist) + + w_type = get_expr_type(weight, edge_table).lower() + init_w = INT_MAX + if w_type in ['real', 'double precision', 'float8']: + init_w = INFINITY + + # We keep a table of every vertex, the minimum cost to that destination + # seen so far and the parent to this vertex in the associated shortest + # path. This table will be updated throughout the execution. + plpy.execute( + """ CREATE TABLE {out_table} AS (SELECT + {grp_comma} {src} AS {vertex_id}, {weight}, + {src} AS parent FROM {edge_table} LIMIT 0) + {distribution} """.format(**locals())) + + # We keep a summary table to keep track of the parameters used for this + # SSSP run. This table is used in the path finding function to eliminate + # the need for repetition. + plpy.execute(""" CREATE TABLE {out_table}_summary ( + vertex_table TEXT, + vertex_id TEXT, + edge_table TEXT, + edge_args TEXT, + source_vertex INTEGER, + out_table TEXT, + grouping_cols TEXT) + """.format(**locals())) + plpy.execute(""" INSERT INTO {out_table}_summary VALUES + ('{vertex_table}', '{v_st}', '{edge_table}', '{e_st}', + {source_vertex}, '{out_table}', '{g_st}') + """.format(**locals())) + + # We keep 2 update tables and alternate them during the execution. + # This is necessary since we need to know which vertices are updated in + # the previous iteration to calculate the next set of updates. + plpy.execute( + """ CREATE TEMP TABLE {oldupdate} AS (SELECT + {src} AS id, {weight}, + {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0) + {local_distribution} + """.format(**locals())) + plpy.execute( + """ CREATE TEMP TABLE {newupdate} AS (SELECT + {src} AS id, {weight}, + {src} AS parent {comma_grp} FROM {edge_table} LIMIT 0) + {local_distribution} + """.format(**locals())) + + # Since HAWQ does not allow us to update, we create a new table and + # rename at every iteration. + if is_hawq: + temp_table = unique_string(desp='temp') + sql = """ CREATE TABLE {temp_table} AS (SELECT * FROM {out_table} ) + {distribution} """ + plpy.execute(sql.format(**locals())) + + # GPDB and HAWQ have distributed by clauses to help them with indexing. + # For Postgres we add the indices manually. + if is_platform_pg(): + plpy.execute(""" + CREATE INDEX ON {out_table} ({vertex_id}); + CREATE INDEX ON {oldupdate} (id); + CREATE INDEX ON {newupdate} (id); + """.format(**locals())) + + # The initialization step is quite different when grouping is involved + # since not every group (subgraph) will have the same set of vertices. + + # Example: + # Assume there are two grouping columns g1 and g2 + # g1 values are 0 and 1. g2 values are 5 and 6 + if grouping_cols is not None: + + distinct_grp_table = unique_string(desp='grp') + plpy.execute("DROP TABLE IF EXISTS {distinct_grp_table}". + format(**locals())) + plpy.execute(""" + CREATE TEMP TABLE {distinct_grp_table} AS + SELECT DISTINCT {grouping_cols} FROM {edge_table} + """.format(**locals())) + subq = unique_string(desp='subquery') + + checkg_ds_sub = _check_groups(distinct_grp_table, subq, glist) + grp_d_comma = _grp_from_table(distinct_grp_table, glist) + "," + + plpy.execute(""" + INSERT INTO {out_table} + SELECT {grp_d_comma} {vertex_id} AS {vertex_id}, + {init_w} AS {weight}, NULL::INT AS parent + FROM {distinct_grp_table} INNER JOIN + ( + SELECT {src} AS {vertex_id} {comma_grp} + FROM {edge_table} + UNION + SELECT {dest} AS {vertex_id} {comma_grp} + FROM {edge_table} + ) {subq} ON ({checkg_ds_sub}) + WHERE {vertex_id} IS NOT NULL + """.format(**locals())) + + plpy.execute(""" + INSERT INTO {oldupdate} + SELECT {source_vertex}, 0, {source_vertex}, + {grouping_cols} + FROM {distinct_grp_table} + """.format(**locals())) + + # The maximum number of vertices for any group. + # Used for determining negative cycles. + v_cnt = plpy.execute(""" + SELECT max(count) as max FROM ( + SELECT count({vertex_id}) AS count + FROM {out_table} + GROUP BY {grouping_cols}) x + """.format(**locals()))[0]['max'] + plpy.execute("DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) + else: + plpy.execute(""" + INSERT INTO {out_table} + SELECT {vertex_id} AS {vertex_id}, + {init_w} AS {weight}, + NULL AS parent + FROM {vertex_table} + WHERE {vertex_id} IS NOT NULL + """.format(**locals())) + + # The source can be reached with 0 cost and it has itself as the + # parent. + plpy.execute(""" + INSERT INTO {oldupdate} + VALUES({source_vertex},0,{source_vertex}) + """.format(**locals())) + + v_cnt = plpy.execute(""" + SELECT count(*) FROM {vertex_table} + WHERE {vertex_id} IS NOT NULL + """.format(**locals()))[0]['count'] + + for i in range(0, v_cnt + 1): + + # Apply the updates calculated in the last iteration. + if is_hawq: + sql = """ + TRUNCATE TABLE {temp_table}; + INSERT INTO {temp_table} + SELECT * + FROM {out_table} + WHERE NOT EXISTS ( + SELECT 1 + FROM {oldupdate} as oldupdate + WHERE {out_table}.{vertex_id} = oldupdate.id + {checkg_oo}) + UNION + SELECT {grp_comma} id, {weight}, parent FROM {oldupdate}; + """ + plpy.execute(sql.format(**locals())) + plpy.execute("DROP TABLE {0}".format(out_table)) + plpy.execute("ALTER TABLE {0} RENAME TO {1}". + format(temp_table, out_table)) + sql = """ CREATE TABLE {temp_table} AS ( + SELECT * FROM {out_table} LIMIT 0) + {distribution};""" + plpy.execute(sql.format(**locals())) + ret = plpy.execute("SELECT id FROM {0} LIMIT 1". + format(oldupdate)) + else: + sql = """ + UPDATE {out_table} SET + {weight} = oldupdate.{weight}, + parent = oldupdate.parent + FROM + {oldupdate} AS oldupdate + WHERE + {out_table}.{vertex_id} = oldupdate.id AND + {out_table}.{weight} > oldupdate.{weight} {checkg_oo} + """ + ret = plpy.execute(sql.format(**locals())) + + if ret.nrows() == 0: + break + + plpy.execute("TRUNCATE TABLE {0}".format(newupdate)) + + # 'oldupdate' table has the update info from the last iteration + + # Consider every edge that has an updated source + # From these edges: + # For every destination vertex, find the min total cost to reach. + # Note that, just calling an aggregate function with group by won't + # let us store the src field of the edge (needed for the parent). + # This is why we need the 'x'; it gives a list of destinations and + # associated min values. Using these values, we identify which edge + # is selected. + + # Since using '=' with floats is dangerous we use an epsilon value + # for comparison. + + # Once we have a list of edges and values (stores as 'message'), + # we check if these values are lower than the existing shortest + # path values. + + sql = (""" INSERT INTO {newupdate} + SELECT DISTINCT ON (message.id {comma_grp}) + message.id AS id, + message.{weight} AS {weight}, + message.parent AS parent {comma_grp_m} + FROM {out_table} AS out_table INNER JOIN + ( + SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight}, + oldupdate.id AS parent {comma_grp_e} + FROM {oldupdate} AS oldupdate INNER JOIN + {edge_table} ON + ({edge_table}.{src} = oldupdate.id {checkg_eo}) + INNER JOIN + ( + SELECT {edge_table}.{dest} AS id, + min(oldupdate.{weight} + + {edge_table}.{weight}) AS {weight} {comma_grp_e} + FROM {oldupdate} AS oldupdate INNER JOIN + {edge_table} ON + ({edge_table}.{src}=oldupdate.id {checkg_eo}) + GROUP BY {edge_table}.{dest} {comma_grp_e} + ) x + ON ({edge_table}.{dest} = x.id {checkg_ex} ) + WHERE ABS(oldupdate.{weight} + {edge_table}.{weight} + - x.{weight}) < {EPSILON} + ) message + ON (message.id = out_table.{vertex_id} {checkg_om}) + WHERE message.{weight}<out_table.{weight} + """.format(**locals())) + + plpy.execute(sql) + + # Swap the update tables for the next iteration. + tmp = oldupdate + oldupdate = newupdate + newupdate = tmp + + plpy.execute("DROP TABLE IF EXISTS {0}".format(newupdate)) + # The algorithm should converge in less than |V| iterations. + # Otherwise there is a negative cycle in the graph. + if i == v_cnt: + if grouping_cols is None: + plpy.execute("DROP TABLE IF EXISTS {0},{1},{2}". + format(out_table, out_table + "_summary", oldupdate)) + if is_hawq: + plpy.execute("DROP TABLE IF EXISTS {0}".format(temp_table)) + plpy.error("Graph SSSP: Detected a negative cycle in the graph.") + + # It is possible that not all groups has negative cycles. + else: + + # negs is the string created by collating grouping columns. + # By looking at the oldupdate table we can see which groups + # are in a negative cycle. + + negs = plpy.execute( + """ SELECT array_agg(DISTINCT ({grouping_cols})) AS grp + FROM {oldupdate} + """.format(**locals()))[0]['grp'] + + # Delete the groups with negative cycles from the output table. + if is_hawq: + sql_del = """ + TRUNCATE TABLE {temp_table}; + INSERT INTO {temp_table} + SELECT * + FROM {out_table} + WHERE NOT EXISTS( + SELECT 1 + FROM {oldupdate} as oldupdate + WHERE {checkg_oo_sub} + );""" + plpy.execute(sql_del.format(**locals())) + plpy.execute("DROP TABLE {0}".format(out_table)) + plpy.execute("ALTER TABLE {0} RENAME TO {1}". + format(temp_table, out_table)) + else: + sql_del = """ DELETE FROM {out_table} + USING {oldupdate} AS oldupdate + WHERE {checkg_oo_sub}""" + plpy.execute(sql_del.format(**locals())) + + # If every group has a negative cycle, + # drop the output table as well. + if table_is_empty(out_table): + plpy.execute("DROP TABLE IF EXISTS {0},{1}". + format(out_table, out_table + "_summary")) + + plpy.warning( + """Graph SSSP: Detected a negative cycle in the """ + + """sub-graphs of following groups: {0}.""". + format(str(negs)[1:-1])) + + plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) + if is_hawq: + plpy.execute("DROP TABLE IF EXISTS {temp_table} ". + format(**locals())) + return None + def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table, - **kwargs): - """ + **kwargs): + """ Helper function that can be used to get the shortest path for a vertex Args: @param sssp_table Name of the table that contains the SSSP output. @@ -447,188 +447,187 @@ def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table, desired path. @param path_table Name of the output table that contains the path. - """ - with MinWarning("warning"): - _validate_get_path(sssp_table, dest_vertex, path_table) - - temp1_name = unique_string(desp='temp1') - temp2_name = unique_string(desp='temp2') - - select_grps = "" - check_grps_t1 = "" - check_grps_t2 = "" - grp_comma = "" - tmp = "" - - summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table)) - vertex_id = summary[0]['vertex_id'] - source_vertex = summary[0]['source_vertex'] - - if vertex_id == "NULL": - vertex_id = "id" - - grouping_cols = summary[0]['grouping_cols'] - if grouping_cols == "NULL": - grouping_cols = None - - if grouping_cols is not None: - glist = split_quoted_delimited_str(grouping_cols) - select_grps = _grp_from_table(sssp_table,glist) + " , " - check_grps_t1 = " AND " + _check_groups( - sssp_table,temp1_name,glist) - check_grps_t2 = " AND " + _check_groups( - sssp_table,temp2_name,glist) - - grp_comma = grouping_cols + " , " - - if source_vertex == dest_vertex: - plpy.execute(""" - CREATE TABLE {path_table} AS - SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path - FROM {sssp_table} WHERE {vertex_id} = {dest_vertex} - """.format(**locals())) - return - - plpy.execute( "DROP TABLE IF EXISTS {0},{1}". - format(temp1_name,temp2_name)); - out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS - SELECT {grp_comma} {sssp_table}.parent AS {vertex_id}, - ARRAY[{dest_vertex}] AS path - FROM {sssp_table} - WHERE {vertex_id} = {dest_vertex} - AND {sssp_table}.parent IS NOT NULL - """.format(**locals())) - - plpy.execute(""" - CREATE TEMP TABLE {temp2_name} AS - SELECT * FROM {temp1_name} LIMIT 0 - """.format(**locals())) - - # Follow the 'parent' chain until you reach the source. - while out.nrows() > 0: - - plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals())) - # If the vertex id is not the source vertex, - # Add it to the path and move to its parent - out = plpy.execute( - """ INSERT INTO {temp2_name} - SELECT {select_grps} {sssp_table}.parent AS {vertex_id}, - {sssp_table}.{vertex_id} || {temp1_name}.path AS path - FROM {sssp_table} INNER JOIN {temp1_name} ON - ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id} - {check_grps_t1}) - WHERE {source_vertex} <> {sssp_table}.{vertex_id} - """.format(**locals())) - - tmp = temp2_name - temp2_name = temp1_name - temp1_name = tmp - - tmp = check_grps_t1 - check_grps_t1 = check_grps_t2 - check_grps_t2 = tmp - - # Add the source vertex to the beginning of every path and - # add the empty arrays for the groups that don't have a path to reach - # the destination vertex - plpy.execute(""" - CREATE TABLE {path_table} AS - SELECT {grp_comma} {source_vertex} || path AS path - FROM {temp2_name} - UNION - SELECT {grp_comma} '{{}}'::INT[] AS path - FROM {sssp_table} - WHERE {vertex_id} = {dest_vertex} - AND {sssp_table}.parent IS NULL - """.format(**locals())) - - out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table)) - - if out.nrows() == 0: - plpy.error( - "Graph SSSP: Vertex {0} is not present in the SSSP table {1}". - format(dest_vertex,sssp_table)) - - plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}". - format(**locals())) - - return None + """ + with MinWarning("warning"): + _validate_get_path(sssp_table, dest_vertex, path_table) + + temp1_name = unique_string(desp='temp1') + temp2_name = unique_string(desp='temp2') + + select_grps = "" + check_grps_t1 = "" + check_grps_t2 = "" + grp_comma = "" + tmp = "" + + summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table)) + vertex_id = summary[0]['vertex_id'] + source_vertex = summary[0]['source_vertex'] + + if vertex_id == "NULL": + vertex_id = "id" + + grouping_cols = summary[0]['grouping_cols'] + if grouping_cols == "NULL": + grouping_cols = None + + if grouping_cols is not None: + glist = split_quoted_delimited_str(grouping_cols) + select_grps = _grp_from_table(sssp_table, glist) + " , " + check_grps_t1 = " AND " + _check_groups( + sssp_table, temp1_name, glist) + check_grps_t2 = " AND " + _check_groups( + sssp_table, temp2_name, glist) + + grp_comma = grouping_cols + " , " + + if source_vertex == dest_vertex: + plpy.execute(""" + CREATE TABLE {path_table} AS + SELECT {grp_comma} '{{{dest_vertex}}}'::INT[] AS path + FROM {sssp_table} WHERE {vertex_id} = {dest_vertex} + """.format(**locals())) + return + + plpy.execute("DROP TABLE IF EXISTS {0},{1}". + format(temp1_name, temp2_name)) + out = plpy.execute(""" CREATE TEMP TABLE {temp1_name} AS + SELECT {grp_comma} {sssp_table}.parent AS {vertex_id}, + ARRAY[{dest_vertex}] AS path + FROM {sssp_table} + WHERE {vertex_id} = {dest_vertex} + AND {sssp_table}.parent IS NOT NULL + """.format(**locals())) + + plpy.execute(""" + CREATE TEMP TABLE {temp2_name} AS + SELECT * FROM {temp1_name} LIMIT 0 + """.format(**locals())) + + # Follow the 'parent' chain until you reach the source. + while out.nrows() > 0: + + plpy.execute("TRUNCATE TABLE {temp2_name}".format(**locals())) + # If the vertex id is not the source vertex, + # Add it to the path and move to its parent + out = plpy.execute( + """ INSERT INTO {temp2_name} + SELECT {select_grps} {sssp_table}.parent AS {vertex_id}, + {sssp_table}.{vertex_id} || {temp1_name}.path AS path + FROM {sssp_table} INNER JOIN {temp1_name} ON + ({sssp_table}.{vertex_id} = {temp1_name}.{vertex_id} + {check_grps_t1}) + WHERE {source_vertex} <> {sssp_table}.{vertex_id} + """.format(**locals())) + + tmp = temp2_name + temp2_name = temp1_name + temp1_name = tmp + + tmp = check_grps_t1 + check_grps_t1 = check_grps_t2 + check_grps_t2 = tmp + + # Add the source vertex to the beginning of every path and + # add the empty arrays for the groups that don't have a path to reach + # the destination vertex + plpy.execute(""" + CREATE TABLE {path_table} AS + SELECT {grp_comma} {source_vertex} || path AS path + FROM {temp2_name} + UNION + SELECT {grp_comma} '{{}}'::INT[] AS path + FROM {sssp_table} + WHERE {vertex_id} = {dest_vertex} + AND {sssp_table}.parent IS NULL + """.format(**locals())) + + out = plpy.execute("SELECT 1 FROM {0} LIMIT 1".format(path_table)) + + if out.nrows() == 0: + plpy.error( + "Graph SSSP: Vertex {0} is not present in the SSSP table {1}". + format(dest_vertex, sssp_table)) + + plpy.execute("DROP TABLE IF EXISTS {temp1_name}, {temp1_name}". + format(**locals())) + return None def _validate_sssp(vertex_table, vertex_id, edge_table, edge_params, - source_vertex, out_table, glist, **kwargs): + source_vertex, out_table, glist, **kwargs): - validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, - out_table,'SSSP') + validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, + out_table, 'SSSP') - _assert(isinstance(source_vertex,int), - """Graph SSSP: Source vertex {source_vertex} has to be an integer.""". - format(**locals())) - src_exists = plpy.execute(""" - SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex} - """.format(**locals())) + _assert(isinstance(source_vertex, int), + """Graph SSSP: Source vertex {source_vertex} has to be an integer.""". + format(**locals())) + src_exists = plpy.execute(""" + SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex} + """.format(**locals())) - if src_exists.nrows() == 0: - plpy.error( - """Graph SSSP: Source vertex {source_vertex} is not present in the vertex table {vertex_table}.""". - format(**locals())) + if src_exists.nrows() == 0: + plpy.error("Graph SSSP: Source vertex {source_vertex} is not present " + "in the vertex table {vertex_table}.".format(**locals())) - vt_error = plpy.execute( - """ SELECT {vertex_id} - FROM {vertex_table} - WHERE {vertex_id} IS NOT NULL - GROUP BY {vertex_id} - HAVING count(*) > 1 """.format(**locals())) + vt_error = plpy.execute(""" + SELECT {vertex_id} + FROM {vertex_table} + WHERE {vertex_id} IS NOT NULL + GROUP BY {vertex_id} + HAVING count(*) > 1 + """.format(**locals())) - if vt_error.nrows() != 0: - plpy.error( - """Graph SSSP: Source vertex table {vertex_table} contains duplicate vertex id's.""". - format(**locals())) + if vt_error.nrows() != 0: + plpy.error("Graph SSSP: Source vertex table {vertex_table} " + "contains duplicate vertex id's.".format(**locals())) - _assert(not table_exists(out_table+"_summary"), - "Graph SSSP: Output summary table already exists!") + _assert(not table_exists(out_table + "_summary"), + "Graph SSSP: Output summary table already exists!") - if glist is not None: - _assert(columns_exist_in_table(edge_table, glist), - """Graph SSSP: Not all columns from {glist} are present in edge table ({edge_table}).""". - format(**locals())) + if glist is not None: + _assert(columns_exist_in_table(edge_table, glist), + "Graph SSSP: Not all columns from {glist} are present in " + "edge table ({edge_table}).".format(**locals())) + return None - return None def _validate_get_path(sssp_table, dest_vertex, path_table, **kwargs): - _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''), - "Graph SSSP: Invalid SSSP table name!") - _assert(table_exists(sssp_table), - "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table)) - _assert(not table_is_empty(sssp_table), - "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table)) + _assert(sssp_table and sssp_table.strip().lower() not in ('null', ''), + "Graph SSSP: Invalid SSSP table name!") + _assert(table_exists(sssp_table), + "Graph SSSP: SSSP table ({0}) is missing!".format(sssp_table)) + _assert(not table_is_empty(sssp_table), + "Graph SSSP: SSSP table ({0}) is empty!".format(sssp_table)) - summary = sssp_table+"_summary" - _assert(table_exists(summary), - "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary)) - _assert(not table_is_empty(summary), - "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary)) + summary = sssp_table + "_summary" + _assert(table_exists(summary), + "Graph SSSP: SSSP summary table ({0}) is missing!".format(summary)) + _assert(not table_is_empty(summary), + "Graph SSSP: SSSP summary table ({0}) is empty!".format(summary)) - _assert(not table_exists(path_table), - "Graph SSSP: Output path table already exists!") + _assert(not table_exists(path_table), + "Graph SSSP: Output path table already exists!") + + return None - return None def graph_sssp_help(schema_madlib, message, **kwargs): - """ - Help function for graph_sssp and graph_sssp_get_path - - Args: - @param schema_madlib - @param message: string, Help message string - @param kwargs - - Returns: - String. Help/usage information - """ - if not message: - help_string = """ + """ + Help function for graph_sssp and graph_sssp_get_path + + Args: + @param schema_madlib + @param message: string, Help message string + @param kwargs + + Returns: + String. Help/usage information + """ + if not message: + help_string = """ ----------------------------------------------------------------------- SUMMARY ----------------------------------------------------------------------- @@ -640,8 +639,8 @@ weights of its constituent edges is minimized. For more details on function usage: SELECT {schema_madlib}.graph_sssp('usage') """ - elif message.lower() in ['usage', 'help', '?']: - help_string = """ + elif message.lower() in ['usage', 'help', '?']: + help_string = """ Given a graph and a source vertex, single source shortest path (SSSP) algorithm finds a path for every vertex such that the sum of the weights of its constituent edges is minimized. @@ -651,8 +650,8 @@ weights of its constituent edges is minimized. To retrieve the path for a specific vertex: SELECT {schema_madlib}.graph_sssp_get_path( - sssp_table TEXT, -- Name of the table that contains the SSSP output. - dest_vertex INT, -- The vertex that will be the destination of the + sssp_table TEXT, -- Name of the table that contains the SSSP output. + dest_vertex INT, -- The vertex that will be the destination of the -- desired path. path_table TEXT -- Name of the output table that contains the path. ); @@ -679,8 +678,8 @@ every group and has the following columns: - path (ARRAY) : The shortest path from the source vertex (as specified in the SSSP execution) to the destination vertex. """ - elif message.lower() in ("example", "examples"): - help_string = """ + elif message.lower() in ("example", "examples"): + help_string = """ ---------------------------------------------------------------------------- EXAMPLES ---------------------------------------------------------------------------- @@ -723,12 +722,12 @@ INSERT INTO edge VALUES -- Compute the SSSP: DROP TABLE IF EXISTS out; SELECT madlib.graph_sssp( - 'vertex', -- Vertex table - 'id', -- Vertix id column - 'edge', -- Edge table - 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments - 0, -- The source vertex - 'out' -- Output table of SSSP + 'vertex', -- Vertex table + 'id', -- Vertix id column + 'edge', -- Edge table + 'src=src, dest=dest, weight=weight', -- Comma delimted string of edge arguments + 0, -- The source vertex + 'out' -- Output table of SSSP ); -- View the SSSP costs for every vertex: SELECT * FROM out ORDER BY id; @@ -752,12 +751,14 @@ INSERT INTO edge_gr VALUES DROP TABLE IF EXISTS out_gr, out_gr_summary; SELECT graph_sssp('vertex',NULL,'edge_gr',NULL,0,'out_gr','grp'); """ - else: - help_string = "No such option. Use {schema_madlib}.graph_sssp()" - - return help_string.format(schema_madlib=schema_madlib, - graph_usage=get_graph_usage(schema_madlib, 'graph_sssp', - """source_vertex INT, -- The source vertex id for the algorithm to start. - out_table TEXT, -- Name of the table to store the result of SSSP. - grouping_cols TEXT -- The list of grouping columns.""")) + else: + help_string = "No such option. Use {schema_madlib}.graph_sssp()" + + common_usage_string = get_graph_usage( + schema_madlib, 'graph_sssp', + """source_vertex INT, -- The source vertex id for the algorithm to start. + out_table TEXT, -- Name of the table to store the result of SSSP. + grouping_cols TEXT -- The list of grouping columns.""") + return help_string.format(schema_madlib=schema_madlib, + graph_usage=common_usage_string) # --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/graph/wcc.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in index 02cceeb..1f6a81f 100644 --- a/src/ports/postgres/modules/graph/wcc.py_in +++ b/src/ports/postgres/modules/graph/wcc.py_in @@ -31,34 +31,37 @@ import plpy from utilities.utilities import _assert from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string, split_quoted_delimited_str -from utilities.validate_args import columns_exist_in_table, get_cols_and_types -from graph_utils import * +from utilities.validate_args import columns_exist_in_table +from utilities.utilities import is_platform_pg, is_platform_hawq +from graph_utils import validate_graph_coding, get_graph_usage -m4_changequote(`<!', `!>') def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, - edge_params, out_table, grouping_cols_list, module_name): + edge_params, out_table, grouping_cols_list, module_name): """ Function to validate input parameters for wcc """ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, - out_table, module_name) + out_table, module_name) if grouping_cols_list: # validate the grouping columns. We currently only support grouping_cols # to be column names in the edge_table, and not expressions! _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib), - "Weakly Connected Components error: One or more grouping columns specified do not exist!") + "Weakly Connected Components error: " + "One or more grouping columns specified do not exist!") def prefix_tablename_to_colnames(table, cols_list): return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list]) + def get_where_condition(table1, table2, cols_list): return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col) - for col in cols_list]) + for col in cols_list]) + def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, - out_table, grouping_cols, **kwargs): + out_table, grouping_cols, **kwargs): """ Function that computes the wcc @@ -78,8 +81,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, plpy.execute('SET client_min_messages TO warning') params_types = {'src': str, 'dest': str} default_args = {'src': 'src', 'dest': 'dest'} - edge_params = extract_keyvalue_params(edge_args, - params_types, default_args) + edge_params = extract_keyvalue_params(edge_args, params_types, default_args) # populate default values for optional params if null if vertex_id is None: @@ -89,7 +91,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, grouping_cols_list = split_quoted_delimited_str(grouping_cols) validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, - edge_params, out_table, grouping_cols_list, 'Weakly Connected Components') + edge_params, out_table, grouping_cols_list, + 'Weakly Connected Components') src = edge_params["src"] dest = edge_params["dest"] @@ -99,21 +102,22 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, toupdate = unique_string(desp='toupdate') temp_out_table = unique_string(desp='tempout') - distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) + distribution = '' if is_platform_pg() else "DISTRIBUTED BY ({0})".format(vertex_id) subq_prefixed_grouping_cols = '' comma_toupdate_prefixed_grouping_cols = '' comma_oldupdate_prefixed_grouping_cols = '' old_new_update_where_condition = '' new_to_update_where_condition = '' edge_to_update_where_condition = '' - is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) + is_hawq = is_platform_hawq() INT_MAX = 2147483647 component_id = 'component_id' + grouping_cols_comma = '' if not grouping_cols else grouping_cols + ',' + if grouping_cols: - distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, - <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>) + distribution = ('' if is_platform_pg() else + "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, vertex_id)) # Update some variables useful for grouping based query strings subq = unique_string(desp='subquery') distinct_grp_table = unique_string(desp='grptable') @@ -121,18 +125,20 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, CREATE TABLE {distinct_grp_table} AS SELECT DISTINCT {grouping_cols} FROM {edge_table} """.format(**locals())) - comma_toupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(toupdate, - grouping_cols_list) - comma_oldupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames( - oldupdate, grouping_cols_list) - subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq, - grouping_cols_list) - old_new_update_where_condition = ' AND ' + get_where_condition( - oldupdate, newupdate, grouping_cols_list) - new_to_update_where_condition = ' AND ' + get_where_condition( - newupdate, toupdate, grouping_cols_list) - edge_to_update_where_condition = ' AND ' + get_where_condition( - edge_table, toupdate, grouping_cols_list) + + pttc = prefix_tablename_to_colnames + gwc = get_where_condition + + comma_toupdate_prefixed_grouping_cols = ', ' + pttc(toupdate, grouping_cols_list) + comma_oldupdate_prefixed_grouping_cols = ', ' + pttc(oldupdate, grouping_cols_list) + subq_prefixed_grouping_cols = pttc(subq, grouping_cols_list) + old_new_update_where_condition = ' AND ' + gwc(oldupdate, newupdate, grouping_cols_list) + new_to_update_where_condition = ' AND ' + gwc(newupdate, toupdate, grouping_cols_list) + edge_to_update_where_condition = ' AND ' + gwc(edge_table, toupdate, grouping_cols_list) + join_grouping_cols = gwc(subq, distinct_grp_table, grouping_cols_list) + group_by_clause = ('' if not grouping_cols else + '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols, + subq, vertex_id)) plpy.execute(""" CREATE TABLE {newupdate} AS SELECT {subq}.{vertex_id}, @@ -148,13 +154,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, ON {join_grouping_cols} GROUP BY {group_by_clause} {distribution} - """.format(select_grouping_cols=','+subq_prefixed_grouping_cols, - join_grouping_cols=get_where_condition(subq, - distinct_grp_table, grouping_cols_list), - group_by_clause='' if not grouping_cols else - subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, vertex_id), - select_grouping_cols_clause='' if not grouping_cols else - grouping_cols+', ', **locals())) + """.format(select_grouping_cols=',' + subq_prefixed_grouping_cols, + select_grouping_cols_clause=grouping_cols_comma, + **locals())) plpy.execute(""" CREATE TEMP TABLE {message} AS SELECT {vertex_id}, @@ -162,8 +164,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, {select_grouping_cols_clause} FROM {newupdate} {distribution} - """.format(select_grouping_cols_clause='' if not grouping_cols else - ', '+grouping_cols, **locals())) + """.format(select_grouping_cols_clause=grouping_cols_comma, + **locals())) else: plpy.execute(""" CREATE TABLE {newupdate} AS @@ -186,13 +188,14 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, """.format(**locals())) nodes_to_update = 1 while nodes_to_update > 0: - # This idea here is simple. Look at all the neighbors of a node, and - # assign the smallest node id among the neighbors as its component_id. - # The next table starts off with very high component_id (INT_MAX). The - # component_id of all nodes which obtain a smaller component_id after - # looking at its neighbors are updated in the next table. At every - # iteration update only those nodes whose component_id in the previous - # iteration are greater than what was found in the current iteration. + # Look at all the neighbors of a node, and assign the smallest node id + # among the neighbors as its component_id. The next table starts off + # with very high component_id (INT_MAX). The component_id of all nodes + # which obtain a smaller component_id after looking at its neighbors are + # updated in the next table. At every iteration update only those nodes + # whose component_id in the previous iteration are greater than what was + # found in the current iteration. + plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) plpy.execute(""" CREATE TEMP TABLE {oldupdate} AS @@ -202,10 +205,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, FROM {message} GROUP BY {group_by_clause} {vertex_id} {distribution} - """.format(grouping_cols_select='' if not grouping_cols else - ', {0}'.format(grouping_cols), group_by_clause='' - if not grouping_cols else '{0}, '.format(grouping_cols), - **locals())) + """.format(grouping_cols_select='' if not grouping_cols else ', {0}'.format(grouping_cols), + group_by_clause=grouping_cols_comma, + **locals())) plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate)) plpy.execute(""" @@ -236,8 +238,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, SELECT * FROM {toupdate}; """.format(**locals())) plpy.execute("DROP TABLE {0}".format(newupdate)) - plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table, - newupdate)) + plpy.execute("ALTER TABLE {0} RENAME TO {1}". + format(temp_out_table, newupdate)) plpy.execute(""" CREATE TABLE {temp_out_table} AS SELECT * FROM {newupdate} @@ -275,9 +277,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, ) AS t GROUP BY {group_by_clause} {vertex_id} """.format(select_grouping_cols='' if not grouping_cols - else ', {0}'.format(grouping_cols), group_by_clause='' - if not grouping_cols else ' {0}, '.format(grouping_cols), - **locals())) + else ', {0}'.format(grouping_cols), group_by_clause='' + if not grouping_cols else ' {0}, '.format(grouping_cols), + **locals())) plpy.execute("DROP TABLE {0}".format(oldupdate)) if grouping_cols: @@ -300,6 +302,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, if is_hawq: plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table)) + def wcc_help(schema_madlib, message, **kwargs): """ Help function for wcc @@ -315,11 +318,13 @@ def wcc_help(schema_madlib, message, **kwargs): if message is not None and \ message.lower() in ("usage", "help", "?"): help_string = "Get from method below" - help_string = get_graph_usage(schema_madlib, 'Weakly Connected Components', + help_string = get_graph_usage( + schema_madlib, + 'Weakly Connected Components', """out_table TEXT, -- Output table of weakly connected components - grouping_col TEXT -- Comma separated column names to group on - -- (DEFAULT = NULL, no grouping) -""") + grouping_col TEXT -- Comma separated column names to group on + -- (DEFAULT = NULL, no grouping) + """) else: if message is not None and \ message.lower() in ("example", "examples"): http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/d487df3c/src/ports/postgres/modules/utilities/utilities.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in index 6a5e8f9..b28a5f3 100644 --- a/src/ports/postgres/modules/utilities/utilities.py_in +++ b/src/ports/postgres/modules/utilities/utilities.py_in @@ -14,32 +14,43 @@ if __name__ != "__main__": m4_changequote(`<!', `!>') +def has_function_properties(): + """ __HAS_FUNCTION_PROPERTIES__ variable defined during configure """ + return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>) + + +def is_platform_pg(): + """ __POSTGRESQL__ variable defined during configure """ + return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>) +# ------------------------------------------------------------------------------ + + +def is_platform_hawq(): + """ __HAWQ__ variable defined during configure """ + return m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) +# ------------------------------------------------------------------------------ + + def get_seg_number(): """ Find out how many primary segments exist in the distribution Might be useful for partitioning data. """ - m4_ifdef(<!__POSTGRESQL__!>, <!return 1!>, <! - return plpy.execute( - """ - SELECT count(*) from gp_segment_configuration - WHERE role = 'p' - """)[0]['count'] - !>) + if is_platform_pg(): + return 1 + else: + return plpy.execute(""" + SELECT count(*) from gp_segment_configuration + WHERE role = 'p' + """)[0]['count'] # ------------------------------------------------------------------------------ def is_orca(): - m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <! - optimizer = plpy.execute("show optimizer")[0]["optimizer"] - return True if optimizer == 'on' else False - !>, <! + if has_function_properties(): + optimizer = plpy.execute("show optimizer")[0]["optimizer"] + if optimizer == 'on': + return True return False - !>) -# ------------------------------------------------------------------------------ - - -def is_platform_pg(): - return m4_ifdef(<!__POSTGRESQL__!>, <!True!>, <!False!>) # ------------------------------------------------------------------------------