Github user njayaram2 commented on a diff in the pull request:
https://github.com/apache/incubator-madlib/pull/141#discussion_r125089028
--- Diff: src/ports/postgres/modules/graph/bfs.py_in ---
@@ -0,0 +1,479 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Breadth-First Search
+
+# Please refer to the bfs.sql_in file for the documentation
+
+"""
+@file bfs.py_in
+
+@namespace graph
+"""
+
+import plpy
+from graph_utils import validate_graph_coding
+from graph_utils import get_graph_usage
+from graph_utils import _grp_null_checks
+from utilities.control import MinWarning
+from utilities.utilities import _assert
+from utilities.utilities import extract_keyvalue_params
+from utilities.utilities import split_quoted_delimited_str
+from utilities.validate_args import table_exists
+from utilities.validate_args import columns_exist_in_table
+
+m4_changequote(`<!', `!>')
+
+def _validate_bfs(vertex_table, vertex_id, edge_table, edge_params,
+ source_vertex, out_table, max_distance, directed, grouping_cols_list,
**kwargs):
+
+ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
+ out_table,'BFS')
+
+ _assert((max_distance >= 0) and isinstance(max_distance,int),
+ """Graph BFS: Invalid max_distance type or value ({0}), must be
integer,
+ be greater than or equal to 0 and be less than max allowable
integer
+ (2147483647).""".
+ format(max_distance))
+
+ _assert(isinstance(directed,bool),
+ """Graph BFS: Invalid value for directed ({0}), must be
boolean.""".
+ format(directed))
+
+ _assert(isinstance(source_vertex,int),
+ """Graph BFS: Source vertex {source_vertex} has to be an
integer.""".
+ format(**locals()))
+ src_exists = plpy.execute("""
+ SELECT * FROM {vertex_table} WHERE {vertex_id}={source_vertex}
+ """.format(**locals()))
+ if src_exists.nrows() == 0:
+ plpy.error(
+ """Graph BFS: Source vertex {source_vertex} is not present in
the
+ vertex table {vertex_table}.""".
+ format(**locals()))
+
+ vt_error = plpy.execute(
+ """ SELECT {vertex_id}
+ FROM {vertex_table}
+ WHERE {vertex_id} IS NOT NULL
+ GROUP BY {vertex_id}
+ HAVING count(*) > 1 """.format(**locals()))
+ if vt_error.nrows() != 0:
+ plpy.error(
+ """Graph BFS: Source vertex table {vertex_table} contains
duplicate
+ vertex id's.""".
+ format(**locals()))
+
+ _assert(not table_exists(out_table+"_summary"),
+ "Graph BFS: Output summary table already exists!")
+
+ if grouping_cols_list is not None:
+ _assert(columns_exist_in_table(edge_table, grouping_cols_list),
+ """Graph BFS: Not all columns from {grouping_cols_list} are
present
+ in edge table ({edge_table}).""".
+ format(**locals()))
+
+ return None
+
+
+def graph_bfs(schema_madlib, vertex_table, vertex_id, edge_table,
+ edge_args, source_vertex, out_table, max_distance, directed,
grouping_cols,
+ **kwargs):
+
+ """
+ Breadth First Search algorithm for graphs [1].
+ Args:
+ @param vertex_table Name of the table that contains the vertex
data.
+ @param vertex_id Name of the column containing the vertex
ids.
+ @param edge_table Name of the table that contains the edge
data.
+ @param edge_args A comma-delimited string containing multiple
+ named arguments of the form "name=value".
+ @param source_vertex The source vertex id for the algorithm to
start.
+ @param out_table Name of the table to store the result of
SSSP.
+ @param max_distance Maximum distance from the source_vertex to
search for.
+ @param directed Graph will be treated as directed if this
boolean flag
+ is set to TRUE. Graph is treated as
undirected by default.
+ @param grouping_cols The list of grouping columns.
+
+ [1] https://en.wikipedia.org/wiki/Breadth-first_search
+ """
+
+ with MinWarning("warning"):
+
+ INT_MAX = 2147483647
+
+ params_types = {'src': str, 'dest': str}
+ default_args = {'src': 'src', 'dest': 'dest'}
+ edge_params = extract_keyvalue_params(edge_args,
+ params_types,
+ default_args)
+
+ # Prepare the input for recording in the summary table
+ if vertex_id is None:
+ v_st= "NULL"
+ vertex_id = "id"
+ else:
+ v_st = vertex_id
+ if edge_args is None:
+ e_st = "NULL"
+ else:
+ e_st = edge_args
+ if max_distance is None:
+ d_st= "NULL"
+ max_distance = INT_MAX
+ else:
+ d_st = max_distance
+ if directed is None:
+ dir_st= "NULL"
+ directed = False
+ else:
+ dir_st = directed
+ if grouping_cols is None:
+ g_st = "NULL"
+ glist = None
+ else:
+ g_st = grouping_cols
+ glist = split_quoted_delimited_str(grouping_cols)
+
+ src = edge_params["src"]
+ dest = edge_params["dest"]
+
+ distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
+ <!"DISTRIBUTED BY ({0})".format(vertex_id)!>)
+ local_distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
+ <!"DISTRIBUTED BY (id)"!>)
+
+ _validate_bfs(vertex_table, vertex_id, edge_table,
+ edge_params, source_vertex, out_table, max_distance, directed,
glist)
+
+ # Initialize grouping related variables
+ grp_comma = ""
+ and_grp_null_checks = ""
+
+ if grouping_cols is not None:
+ grp_comma = grouping_cols + ", "
+ and_grp_null_checks = " AND " + _grp_null_checks(glist)
+
+ # We keep a table of every vertex, the distance to that vertex
from source
+ # and the parent in the path to the vertex
+ # This table will be updated throughout the execution.
+ dist_col = "dist"
+ parent_col = "parent"
+ curr_dist_val = 0
+
+ # Creating the output table with the appropriate columns and data
types
+ plpy.execute("""
+ CREATE TABLE {out_table} AS (
+ SELECT
+ {grp_comma}
+ {src} AS {vertex_id},
+ {curr_dist_val}::INT AS {dist_col},
+ {src} AS {parent_col}
+ FROM {edge_table}
+ LIMIT 0
+ ) {distribution}""".format(**locals()))
--- End diff --
We don't have to distribute the output table unless it helps in faster
execution of this algorithm itself. The consumers of MADlib functions normally
use `order by` to view results, or might even redistribute the output table
based on their need.
If this is indeed required for faster retrieval/join of data in the queries
that follow, then it might be a good idea to consider `{grouping_cols}` too
apart from `{vertex_id}`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---