Repository: incubator-madlib Updated Branches: refs/heads/master cc4ccd682 -> 01788982d
Feautre: Weakly Connected Components JIRA: MADLIB-1071 Implement a new module in graph, that finds all weakly connected components of a directed graph. A weakly connected component is a subgraph where every node has a path to every other node, ignoring edge directions. This does not have grouping support yet, although the interface has it defined already. Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/01788982 Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/01788982 Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/01788982 Branch: refs/heads/master Commit: 01788982da395d7d1cef50ba99bb9dae2b2eb82b Parents: cc4ccd6 Author: Nandish Jayaram <njaya...@apache.org> Authored: Thu Jun 22 14:49:51 2017 -0700 Committer: Nandish Jayaram <njaya...@apache.org> Committed: Wed Jun 28 17:11:41 2017 -0700 ---------------------------------------------------------------------- .../postgres/modules/graph/test/wcc.sql_in | 76 +++++ src/ports/postgres/modules/graph/wcc.py_in | 329 +++++++++++++++++++ src/ports/postgres/modules/graph/wcc.sql_in | 261 +++++++++++++++ 3 files changed, 666 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/01788982/src/ports/postgres/modules/graph/test/wcc.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/test/wcc.sql_in b/src/ports/postgres/modules/graph/test/wcc.sql_in new file mode 100644 index 0000000..f9430f1 --- /dev/null +++ b/src/ports/postgres/modules/graph/test/wcc.sql_in @@ -0,0 +1,76 @@ +/* ----------------------------------------------------------------------- *//** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + *//* ----------------------------------------------------------------------- */ + +DROP TABLE IF EXISTS vertex, edge; +CREATE TABLE vertex( +vertex_id INTEGER +); +CREATE TABLE edge( +src_node INTEGER, +dest_node INTEGER, +user_id INTEGER +); +INSERT INTO vertex VALUES +(0), +(1), +(2), +(3), +(4), +(5), +(6), +(10), +(11), +(12), +(13), +(14), +(15), +(16); +INSERT INTO edge VALUES +(0, 1, 1), +(0, 2, 1), +(1, 2, 1), +(1, 3, 1), +(2, 3, 1), +(2, 5, 1), +(2, 6, 1), +(3, 0, 1), +(5, 6, 1), +(6, 3, 1), +(10, 11, 2), +(10, 12, 2), +(11, 12, 2), +(11, 13, 2), +(12, 13, 2), +(13, 10, 2), +(15, 16, 2), +(15, 14, 2); + +DROP TABLE IF EXISTS wcc_out; +SELECT weakly_connected_components( + 'vertex', + 'vertex_id', + 'edge', + 'src=src_node,dest=dest_node', + 'wcc_out'); + +SELECT assert(relative_error(count(distinct component_id), 4) < 0.00001, + 'Weakly Connected Components: Number of components found is not 4.' + ) FROM wcc_out; http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/01788982/src/ports/postgres/modules/graph/wcc.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in new file mode 100644 index 0000000..d07ac05 --- /dev/null +++ b/src/ports/postgres/modules/graph/wcc.py_in @@ -0,0 +1,329 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Weakly Connected Components + +# Please refer to the wcc.sql_in file for the documentation + +""" +@file wcc.py_in + +@namespace graph +""" + +import plpy +from utilities.control import MinWarning +from utilities.utilities import _assert +from utilities.utilities import extract_keyvalue_params +from utilities.utilities import unique_string, split_quoted_delimited_str +from utilities.validate_args import columns_exist_in_table, get_cols_and_types +from graph_utils import * + +m4_changequote(`<!', `!>') + + +def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, + edge_params, out_table, grouping_cols_list, module_name): + """ + Function to validate input parameters for wcc + """ + validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, + out_table, module_name) + if grouping_cols_list: + # validate the grouping columns. We currently only support grouping_cols + # to be column names in the edge_table, and not expressions! + _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib), + "Weakly Connected Components error: One or more grouping columns specified do not exist!") + with MinWarning("warning"): + plpy.warning("Grouping is not currently supported at the moment.") + +def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, + out_table, grouping_cols, **kwargs): + """ + Function that computes the wcc + + Args: + @param vertex_table + @param vertex_id + @param edge_table + @param dest_vertex + @param out_table + @param grouping_cols + """ + old_msg_level = plpy.execute(""" + SELECT setting + FROM pg_settings + WHERE name='client_min_messages' + """)[0]['setting'] + plpy.execute('SET client_min_messages TO warning') + params_types = {'src': str, 'dest': str} + default_args = {'src': 'src', 'dest': 'dest'} + edge_params = extract_keyvalue_params(edge_args, + params_types, default_args) + + # populate default values for optional params if null + if vertex_id is None: + vertex_id = "id" + if not grouping_cols: + grouping_cols = '' + + grouping_cols_list = split_quoted_delimited_str(grouping_cols) + validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, + edge_params, out_table, grouping_cols_list, 'Weakly Connected Components') + src = edge_params["src"] + dest = edge_params["dest"] + + message = unique_string(desp='message') + oldupdate = unique_string(desp='oldupdate') + newupdate = unique_string(desp='newupdate') + toupdate = unique_string(desp='toupdate') + temp_out_table = unique_string(desp='tempout') + + distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, + <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) + + is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) + + INT_MAX = 2147483647 + component_id = 'component_id' + plpy.execute(""" + CREATE TABLE {newupdate} AS + SELECT {vertex_id}, CAST({INT_MAX} AS INT) AS {component_id} + FROM {vertex_table} + {distribution} + """.format(**locals())) + if is_hawq: + plpy.execute(""" + CREATE TABLE {temp_out_table} AS + SELECT * FROM {newupdate} + LIMIT 0 + {distribution} + """.format(**locals())) + plpy.execute(""" + CREATE TEMP TABLE {message} AS + SELECT {vertex_id}, CAST({vertex_id} AS INT) AS {component_id} + FROM {vertex_table} + {distribution} + """.format(**locals())) + nodes_to_update = 1 + while nodes_to_update > 0: + # This idea here is simple. Look at all the neighbors of a node, and + # assign the smallest node id among the neighbors as its component_id. + # The next table starts off with very high component_id (INT_MAX). The + # component_id of all nodes which obtain a smaller component_id after + # looking at its neighbors are updated in the next table. At every + # iteration update only those nodes whose component_id in the previous + # iteration are greater than what was found in the current iteration. + plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) + plpy.execute(""" + CREATE TEMP TABLE {oldupdate} AS + SELECT {message}.{vertex_id}, + MIN({message}.{component_id}) AS {component_id} + FROM {message} + GROUP BY {vertex_id} + {distribution} + """.format(**locals())) + + plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate)) + plpy.execute(""" + CREATE TEMP TABLE {toupdate} AS + SELECT {oldupdate}.{vertex_id}, {oldupdate}.{component_id} + FROM {oldupdate}, {newupdate} + WHERE {oldupdate}.{vertex_id}={newupdate}.{vertex_id} + AND {oldupdate}.{component_id}<{newupdate}.{component_id} + {distribution} + """.format(**locals())) + + if is_hawq: + plpy.execute(""" + TRUNCATE TABLE {temp_out_table}; + INSERT INTO {temp_out_table} + SELECT * + FROM {newupdate} + WHERE NOT EXISTS ( + SELECT * + FROM {toupdate} + WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id} + ) + UNION + SELECT * FROM {toupdate}; + """.format(**locals())) + plpy.execute("DROP TABLE {0}".format(newupdate)) + plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(temp_out_table, + newupdate)) + plpy.execute(""" + CREATE TABLE {temp_out_table} AS + SELECT * FROM {newupdate} + LIMIT 0 + {distribution} + """.format(**locals())) + else: + plpy.execute(""" + UPDATE {newupdate} SET + {component_id}={toupdate}.{component_id} + FROM {toupdate} + WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id} + """.format(**locals())) + + plpy.execute("DROP TABLE IF EXISTS {0}".format(message)) + plpy.execute(""" + CREATE TEMP TABLE {message} AS + SELECT {vertex_id}, MIN({component_id}) AS {component_id} + FROM ( + SELECT {edge_table}.{src} AS {vertex_id}, {toupdate}.{component_id} + FROM {toupdate}, {edge_table} + WHERE {edge_table}.{dest} = {toupdate}.{vertex_id} + UNION ALL + SELECT {edge_table}.{dest} AS {vertex_id}, {toupdate}.{component_id} + FROM {toupdate}, {edge_table} + WHERE {edge_table}.{src} = {toupdate}.{vertex_id} + ) AS t + GROUP BY {vertex_id} + """.format(**locals())) + + plpy.execute("DROP TABLE {0}".format(oldupdate)) + nodes_to_update = plpy.execute(""" + SELECT COUNT(*) AS cnt FROM {toupdate} + """.format(**locals()))[0]["cnt"] + + plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table)) + plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3} + """.format(message, oldupdate, newupdate, toupdate)) + if is_hawq: + plpy.execute("""DROP TABLE IF EXISTS {0}""".format(temp_out_table)) + +def wcc_help(schema_madlib, message, **kwargs): + """ + Help function for wcc + + Args: + @param schema_madlib + @param message: string, Help message string + @param kwargs + + Returns: + String. Help/usage information + """ + if message is not None and \ + message.lower() in ("usage", "help", "?"): + help_string = "Get from method below" + help_string = get_graph_usage(schema_madlib, 'Weakly Connected Components', + """out_table TEXT, -- Output table of weakly connected components + grouping_col TEXT -- Comma separated column names to group on + -- (DEFAULT = NULL, no grouping) +""") + else: + if message is not None and \ + message.lower() in ("example", "examples"): + help_string = """ +---------------------------------------------------------------------------- + EXAMPLES +---------------------------------------------------------------------------- +-- Create a graph, represented as vertex and edge tables. +DROP TABLE IF EXISTS vertex, edge; +CREATE TABLE vertex( + id INTEGER +); +CREATE TABLE edge( + src INTEGER, + dest INTEGER, + user_id INTEGER +); +INSERT INTO vertex VALUES +(0), +(1), +(2), +(3), +(4), +(5), +(6), +(10), +(11), +(12), +(13), +(14), +(15), +(16); +INSERT INTO edge VALUES +(0, 1, 1), +(0, 2, 1), +(1, 2, 1), +(1, 3, 1), +(2, 3, 1), +(2, 5, 1), +(2, 6, 1), +(3, 0, 1), +(5, 6, 1), +(6, 3, 1), +(10, 11, 2), +(10, 12, 2), +(11, 12, 2), +(11, 13, 2), +(12, 13, 2), +(13, 10, 2), +(15, 16, 2), +(15, 14, 2); + +-- Find all weakly connected components in the graph: +DROP TABLE IF EXISTS wcc_out; +SELECT madlib.weakly_connected_components( + 'vertex', -- Vertex table + 'id', -- Vertix id column + 'edge', -- Edge table + 'src=src, dest=dest', -- Comma delimted string of edge arguments + 'wcc_out'); -- Output table of weakly connected components + +-- View the component ID associated with each vertex in the graph: +SELECT * FROM wcc_out ORDER BY component_id; + +-- Find all weakly connected components associated with each user, using the +-- grouping feature: +DROP TABLE IF EXISTS wcc_out; +SELECT madlib.weakly_connected_components( + 'vertex', -- Vertex table + 'id', -- Vertix id column + 'edge', -- Edge table + 'src=src, dest=dest', -- Comma delimted string of edge arguments + 'wcc_out', -- Output table of weakly connected components + 'user_id'); -- Grouping column + +-- View the component ID associated with each vertex within the sub-graph +-- associated with each user: +SELECT * FROM wcc_out ORDER BY user_id, component_id; +""" + else: + help_string = """ +---------------------------------------------------------------------------- + SUMMARY +---------------------------------------------------------------------------- +Given a directed graph, a weakly connected component is a sub-graph of the +original graph where all vertices are connected to each other by some path, +ignoring the direction of edges. In case of an undirected graph, a weakly +connected component is also a strongly connected component. +-- +For an overview on usage, run: +SELECT {schema_madlib}.weakly_connected_components('usage'); + +For some examples, run: +SELECT {schema_madlib}.weakly_connected_components('example') +-- +""" + + return help_string.format(schema_madlib=schema_madlib) +# --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/01788982/src/ports/postgres/modules/graph/wcc.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.sql_in b/src/ports/postgres/modules/graph/wcc.sql_in new file mode 100644 index 0000000..af20281 --- /dev/null +++ b/src/ports/postgres/modules/graph/wcc.sql_in @@ -0,0 +1,261 @@ +/* ----------------------------------------------------------------------- *//** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + * @file graph.sql_in + * + * @brief SQL functions for graph analytics + * @date June 2017 + * + * @sa Provides various graph algorithms. + * + *//* ----------------------------------------------------------------------- */ +m4_include(`SQLCommon.m4') + + +/** +@addtogroup grp_wcc + +<div class="toc"><b>Contents</b> +<ul> +<li><a href="#wcc">Weakly Connected Components</a></li> +<li><a href="#notes">Notes</a></li> +<li><a href="#examples">Examples</a></li> +<li><a href="#literature">Literature</a></li> +</ul> +</div> + +@brief Find all weakly connected components of a graph. + +Given a directed graph, a weakly connected component is a subgraph of the original +graph where all vertices are connected to each other by some path, ignoring the +direction of edges. In case of an undirected graph, a weakly connected component is +also a strongly connected component. + +@anchor wcc +@par Weakly Connected Components +<pre class="syntax"> +weakly_connected_components( vertex_table, + vertex_id, + edge_table, + edge_args, + out_table, + grouping_cols + ) +</pre> + +\b Arguments +<dl class="arglist"> +<dt>vertex_table</dt> +<dd>TEXT. Name of the table containing the vertex data for the graph. Must contain the +column specified in the 'vertex_id' parameter below.</dd> + +<dt>vertex_id</dt> +<dd>TEXT, default = 'id'. Name of the column in 'vertex_table' containing +vertex ids. The vertex ids are of type INTEGER with no duplicates. +They do not need to be contiguous.</dd> + +<dt>edge_table</dt> +<dd>TEXT. Name of the table containing the edge data. The edge table must +contain columns for source vertex and destination vertex.</dd> + +<dt>edge_args</dt> +<dd>TEXT. A comma-delimited string containing multiple named arguments of +the form "name=value". The following parameters are supported for +this string argument: + - src (INTEGER): Name of the column containing the source vertex ids in the edge table. + Default column name is 'src'. + - dest (INTEGER): Name of the column containing the destination vertex ids in the edge table. + Default column name is 'dest'.</dd> + +<dt>out_table</dt> +<dd>TEXT. Name of the table to store the component ID associated with each vertex. +It will contain a row for every vertex from 'vertex_table' with +the following columns: + - vertex_id : The id of a vertex. Will use the input parameter 'vertex_id' for column naming. + - component_id : The vertex's component. + - grouping_cols : Grouping column (if any) values associated with the vertex_id.</dd> + +<dt>grouping_cols (optional)</dt> +<dd>TEXT, default: NULL. A single column or a list of comma-separated +columns that divides the input data into discrete groups, resulting in one +distribution per group. When this value is NULL, no grouping is used and +a single model is generated for all data. +@note Grouping is not currently supported at the moment.</dd> + +</dl> + +@anchor notes +@par Notes + +See the Grail project [1] for more background on graph analytics processing +in relational databases. + +@anchor examples +@examp + +-# Create vertex and edge tables to represent the graph: +<pre class="syntax"> +DROP TABLE IF EXISTS vertex, edge; +CREATE TABLE vertex( + id INTEGER +); +CREATE TABLE edge( + src INTEGER, + dest INTEGER, + user_id INTEGER +); +INSERT INTO vertex VALUES +(0), +(1), +(2), +(3), +(4), +(5), +(6), +(10), +(11), +(12), +(13), +(14), +(15), +(16); +INSERT INTO edge VALUES +(0, 1, 1), +(0, 2, 1), +(1, 2, 1), +(1, 3, 1), +(2, 3, 1), +(2, 5, 1), +(2, 6, 1), +(3, 0, 1), +(5, 6, 1), +(6, 3, 1), +(10, 11, 2), +(10, 12, 2), +(11, 12, 2), +(11, 13, 2), +(12, 13, 2), +(13, 10, 2), +(15, 16, 2), +(15, 14, 2); +</pre> + +-# Find all the weakly connected components in the graph: +<pre class="syntax"> +DROP TABLE IF EXISTS wcc_out; +SELECT madlib.weakly_connected_components( + 'vertex', -- Vertex table + 'id', -- Vertix id column + 'edge', -- Edge table + 'src=src, dest=dest', -- Comma delimted string of edge arguments + 'wcc_out'); -- Output table of weakly connected components +SELECT * FROM wcc_out ORDER BY component_id, id; +</pre> +<pre class="result"> + id | component_id +----+-------------- + 0 | 0 + 1 | 0 + 2 | 0 + 3 | 0 + 5 | 0 + 6 | 0 + 4 | 4 + 10 | 10 + 11 | 10 + 12 | 10 + 13 | 10 + 14 | 14 + 15 | 14 + 16 | 14 +(14 rows) +</pre> + +-# Now all the weakly connected components associated with each user +using the grouping feature: +<pre class="syntax"> +DROP TABLE IF EXISTS wcc_out, wcc_out_summary; +SELECT madlib.weakly_connected_components( + 'vertex', -- Vertex table + 'id', -- Vertix id column + 'edge', -- Edge table + 'src=src, dest=dest', -- Comma delimted string of edge arguments + 'wcc_out', -- Output table of weakly connected components + 'user_id'); -- Grouping column name +SELECT * FROM wcc_out ORDER BY user_id, component_id, id; +</pre> +<pre class="result"> + user_id | id | component_id +---------+----+-------------------- + +</pre> + +@anchor literature +@par Literature + +[1] The case against specialized graph analytics engines, J. Fan, G. Soosai Raj, +and J. M. Patel. CIDR 2015. http://cidrdb.org/cidr2015/Papers/CIDR15_Paper20.pdf +*/ + +------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.weakly_connected_components( + vertex_table TEXT, + vertex_id TEXT, + edge_table TEXT, + edge_args TEXT, + out_table TEXT, + grouping_cols TEXT + +) RETURNS VOID AS $$ + PythonFunction(graph, wcc, wcc) +$$ LANGUAGE plpythonu VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.weakly_connected_components( + vertex_table TEXT, + vertex_id TEXT, + edge_table TEXT, + edge_args TEXT, + out_table TEXT + +) RETURNS VOID AS $$ + SELECT MADLIB_SCHEMA.weakly_connected_components($1, $2, $3, $4, $5, NULL); +$$ LANGUAGE sql VOLATILE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `MODIFIES SQL DATA', `'); +------------------------------------------------------------------------- + +-- Online help +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.weakly_connected_components( + message VARCHAR +) RETURNS VARCHAR AS $$ + PythonFunction(graph, wcc, wcc_help) +$$ LANGUAGE plpythonu IMMUTABLE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `'); + +------------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.weakly_connected_components() +RETURNS VARCHAR AS $$ + SELECT MADLIB_SCHEMA.weakly_connected_components(''); +$$ LANGUAGE sql IMMUTABLE +m4_ifdef(`\_\_HAS_FUNCTION_PROPERTIES\_\_', `CONTAINS SQL', `'); +------------------------------------------------------------------------------- +