This is an automated email from the ASF dual-hosted git repository. okislal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 5cee52e5bc1b980ba63eb92c6178118473421512 Author: Ekta Khanna <[email protected]> AuthorDate: Wed Apr 10 15:27:55 2019 -0700 WCC: Duplicate edges to improve performance on Greenplum JIRA: MADLIB-1320 Prior to this update, we used the edge table in join clauses with both src and dest coulmns. This commit updates the query on the message table such that the JOIN always operates on the column that the table is distributed on. To this end, we duplicate the edge table to distribute it on the dest column. This seperation of accesses reduces the redistribute motion on gpdb. In addition, we split the union all query to ensure that the working set is limited with very large graphs. Closes #364 Co-authored-by: Orhan Kislal <[email protected]> --- src/ports/postgres/modules/graph/wcc.py_in | 50 +++++++++++++++++++---------- src/ports/postgres/modules/graph/wcc.sql_in | 5 +++ 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in index de1af27..4b4b05d 100644 --- a/src/ports/postgres/modules/graph/wcc.py_in +++ b/src/ports/postgres/modules/graph/wcc.py_in @@ -108,6 +108,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, newupdate = unique_string(desp='newupdate') toupdate = unique_string(desp='toupdate') temp_out_table = unique_string(desp='tempout') + edge_inverse = unique_string(desp='edge_inverse') distribution = '' if is_platform_pg() else \ "DISTRIBUTED BY ({0})".format(vertex_id) @@ -117,12 +118,23 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, old_new_update_where_condition = '' new_to_update_where_condition = '' edge_to_update_where_condition = '' + edge_inverse_to_update_where_condition = '' INT_MAX = 2147483647 component_id = 'component_id' grouping_cols_comma = '' if not grouping_cols else grouping_cols + ',' comma_grouping_cols = '' if not grouping_cols else ',' + grouping_cols + if not is_platform_pg(): + # In Greenplum, to avoid redistribution of data when in later queries, + # edge_table is duplicated by creating a temporary table distributed + # on dest column + plpy.execute(""" CREATE TEMP TABLE {edge_inverse} AS + SELECT * FROM {edge_table} DISTRIBUTED BY ({dest}) + """.format(**locals())) + else: + edge_inverse = edge_table + if grouping_cols: distribution = ('' if is_platform_pg() else "DISTRIBUTED BY ({0}, {1})".format(grouping_cols, @@ -146,6 +158,8 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, _check_groups(newupdate, toupdate, grouping_cols_list) edge_to_update_where_condition = ' AND ' + \ _check_groups(edge_table, toupdate, grouping_cols_list) + edge_inverse_to_update_where_condition = ' AND ' + \ + _check_groups(edge_inverse, toupdate, grouping_cols_list) join_grouping_cols = _check_groups(subq, distinct_grp_table, grouping_cols_list) group_by_clause_newupdate = ('' if not grouping_cols else '{0}, {1}.{2}'.format(subq_prefixed_grouping_cols, @@ -160,7 +174,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, FROM {edge_table} UNION SELECT {select_grouping_cols_clause} {dest} AS {vertex_id} - FROM {edge_table} + FROM {edge_inverse} ) {subq} ON {join_grouping_cols} GROUP BY {group_by_clause_newupdate} @@ -201,7 +215,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, # updated in the next table. At every iteration update only those nodes # whose component_id in the previous iteration are greater than what was # found in the current iteration. - plpy.execute("DROP TABLE IF EXISTS {0}".format(oldupdate)) plpy.execute(""" CREATE TEMP TABLE {oldupdate} AS @@ -215,7 +228,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, ', {0}'.format(grouping_cols), group_by_clause=grouping_cols_comma, **locals())) - plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate)) plpy.execute(""" CREATE TEMP TABLE {toupdate} AS @@ -240,27 +252,28 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, plpy.execute("DROP TABLE IF EXISTS {0}".format(message)) plpy.execute(""" CREATE TEMP TABLE {message} AS - SELECT {vertex_id}, MIN({component_id}) AS {component_id} - {select_grouping_cols} - FROM ( - SELECT {edge_table}.{src} AS {vertex_id}, - {toupdate}.{component_id} + SELECT {edge_inverse}.{src} AS {vertex_id}, + MIN({toupdate}.{component_id}) AS {component_id} {comma_toupdate_prefixed_grouping_cols} - FROM {toupdate}, {edge_table} - WHERE {edge_table}.{dest} = {toupdate}.{vertex_id} - {edge_to_update_where_condition} - UNION ALL + FROM {toupdate}, {edge_inverse} + WHERE {edge_inverse}.{dest} = {toupdate}.{vertex_id} + {edge_inverse_to_update_where_condition} + GROUP BY {edge_inverse}.{src} {comma_toupdate_prefixed_grouping_cols} + """.format(select_grouping_cols='' if not grouping_cols + else ', {0}'.format(grouping_cols), + **locals())) + + plpy.execute(""" + INSERT INTO {message} SELECT {edge_table}.{dest} AS {vertex_id}, - {toupdate}.{component_id} + MIN({toupdate}.{component_id}) AS {component_id} {comma_toupdate_prefixed_grouping_cols} FROM {toupdate}, {edge_table} WHERE {edge_table}.{src} = {toupdate}.{vertex_id} {edge_to_update_where_condition} - ) AS t - GROUP BY {group_by_clause} {vertex_id} + GROUP BY {edge_table}.{dest} {comma_toupdate_prefixed_grouping_cols} """.format(select_grouping_cols='' if not grouping_cols - else ', {0}'.format(grouping_cols), group_by_clause='' - if not grouping_cols else ' {0}, '.format(grouping_cols), + else ', {0}'.format(grouping_cols), **locals())) plpy.execute("DROP TABLE {0}".format(oldupdate)) @@ -278,6 +291,9 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, SELECT COUNT(*) AS cnt FROM {toupdate} """.format(**locals()))[0]["cnt"] + if not is_platform_pg(): + # Drop intermediate table created for Greenplum + plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_inverse)) plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table)) # Create summary table. We only need the vertex_id and grouping columns # in it. diff --git a/src/ports/postgres/modules/graph/wcc.sql_in b/src/ports/postgres/modules/graph/wcc.sql_in index f5879b9..1c3808b 100644 --- a/src/ports/postgres/modules/graph/wcc.sql_in +++ b/src/ports/postgres/modules/graph/wcc.sql_in @@ -115,6 +115,11 @@ weakly connected components are generated for all data </dl> +@note On Greenplum cluster, the edge table should be distributed on the src +column for better performance. In addition, the user should note that this +function creates a duplicate of the edge table (on Greenplum cluster) for +better performance. + @anchor rlcc @par Retrieve Largest Connected Component
