khannaekta commented on a change in pull request #571: URL: https://github.com/apache/madlib/pull/571#discussion_r708651533
########## File path: src/ports/postgres/modules/dbscan/dbscan.sql_in ########## @@ -132,12 +132,35 @@ performance reasons.</li> <dt>algorithm (optional)</dt> <dd>TEXT, default: 'brute_force'. The name of the algorithm -used to compute clusters. Currently only brute force is supported: +used to compute clusters. Review comment: In the code default is 'optimized'. We should update the docs here to reflect that or update the code to have default as 'brute_force' ########## File path: src/ports/postgres/modules/dbscan/test/dbscan.sql_in ########## @@ -43,14 +43,22 @@ copy dbscan_train_data (id_in, data) FROM stdin delimiter '|'; DROP TABLE IF EXISTS out1, out1_summary, out1_predict; SELECT dbscan('dbscan_train_data','out1','id_in','data',20,4,'squared_dist_norm2','brute'); -SELECT assert(count(DISTINCT id_in) = 5, 'Incorrect cluster 0') FROM out1 WHERE cluster_id = 0 and id_in=ANY(ARRAY[1,2,3,4,5]); +SELECT assert(count(DISTINCT id) = 5, 'Incorrect cluster 0') FROM out1 WHERE cluster_id = 0 and id=ANY(ARRAY[1,2,3,4,5]); -SELECT assert(count(DISTINCT id_in) = 4, 'Incorrect cluster 1') FROM out1 WHERE cluster_id = 1 and id_in=ANY(ARRAY[6,7,8,9]); +SELECT assert(count(DISTINCT id) = 4, 'Incorrect cluster 1') FROM out1 WHERE cluster_id = 1 and id=ANY(ARRAY[6,7,8,9]); + +SELECT assert(id_column = 'id_in', 'id_column field in summary table should have been ''id_in''') FROM out1_summary; SELECT dbscan_predict('out1', 'dbscan_train_data', 'id_in', 'data', 'out1_predict'); SELECT assert(count(DISTINCT cluster_id) = 2, 'Incorrect cluster count') FROM out1_predict; +DROP TABLE IF EXISTS out1, out1_summary, out1_predict; Review comment: Can remove out1_predict ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -23,136 +23,246 @@ from utilities.control import MinWarning from utilities.utilities import _assert from utilities.utilities import unique_string from utilities.utilities import add_postfix -from utilities.utilities import NUMERIC, ONLY_ARRAY +from utilities.utilities import INTEGER, NUMERIC, ONLY_ARRAY from utilities.utilities import is_valid_psql_type from utilities.utilities import is_platform_pg +from utilities.utilities import num_features +from utilities.utilities import get_seg_number from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import cols_in_tbl_valid from utilities.validate_args import get_expr_type from utilities.validate_args import get_algorithm_name from graph.wcc import wcc -BRUTE_FORCE = 'brute_force' -KD_TREE = 'kd_tree' +from math import log +from math import ceil +from math import sqrt +from time import time +from collections import deque -def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps, min_samples, metric, algorithm, **kwargs): +from scipy.spatial import distance +import numpy as np + +import utilities.debug as DEBUG +DEBUG.plpy_info_enabled = False +DEBUG.plpy_execute_enabled = False +DEBUG.timings_enabled = False + +try: + from rtree import index +except ImportError: + RTREE_ENABLED=0 +else: + RTREE_ENABLED=1 + +METHOD_BRUTE_FORCE = 'brute_force' +METHOD_OPTIMIZED = 'optimized' +DEFAULT_MIN_SAMPLES = 5 +DEFAULT_METRIC = 'squared_dist_norm2' + +def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, + eps, min_samples, metric, algorithm, max_segmentation_depth, **kwargs): with MinWarning("warning"): + # algorithm=None is handled in get_algorithm_name() + min_samples = DEFAULT_MIN_SAMPLES if not min_samples else min_samples + metric = DEFAULT_METRIC if not metric else metric + num_segs = get_seg_number() - min_samples = 5 if not min_samples else min_samples - metric = 'squared_dist_norm2' if not metric else metric - algorithm = 'brute' if not algorithm else algorithm + algorithm = get_algorithm_name(algorithm, METHOD_OPTIMIZED, + [METHOD_BRUTE_FORCE, METHOD_OPTIMIZED], 'DBSCAN') - algorithm = get_algorithm_name(algorithm, BRUTE_FORCE, - [BRUTE_FORCE, KD_TREE], 'DBSCAN') + if max_segmentation_depth is None: + # Default to num_segs + max_depth = num_segs + else: + max_depth = max_segmentation_depth + if algorithm != METHOD_OPTIMIZED: + plpy.warning("Ignoring max_segmentation_depth={} param, " + "N/A to algorithm={}".format(max_depth, algorithm)) _validate_dbscan(schema_madlib, source_table, output_table, id_column, - expr_point, eps, min_samples, metric, algorithm) + expr_point, eps, min_samples, metric, algorithm, max_depth) - dist_src_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' - dist_id_sql = '' if is_platform_pg() else 'DISTRIBUTED BY ({0})'.format(id_column) - dist_reach_sql = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (__src__)' + dist_by_id = '' if is_platform_pg() else 'DISTRIBUTED BY (id)' + dist_by_reach_id = '' if is_platform_pg() else 'DISTRIBUTED BY (__reachable_id__)' + dist_by_seg_id = '' if is_platform_pg() else 'DISTRIBUTED BY (seg_id)' + distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' - # Calculate pairwise distances + core_points_table = unique_string(desp='core_points_table') + core_edge_table = unique_string(desp='core_edge_table') distance_table = unique_string(desp='distance_table') - plpy.execute("DROP TABLE IF EXISTS {0}".format(distance_table)) + source_view = unique_string(desp='source_view') + plpy.execute("DROP VIEW IF EXISTS {0}".format(source_view)) sql = """ - CREATE TABLE {distance_table} AS - SELECT __src__, __dest__ FROM ( - SELECT __t1__.{id_column} AS __src__, - __t2__.{id_column} AS __dest__, - {schema_madlib}.{metric}( - __t1__.{expr_point}, __t2__.{expr_point}) AS __dist__ - FROM {source_table} AS __t1__, {source_table} AS __t2__ - WHERE __t1__.{id_column} != __t2__.{id_column}) q1 - WHERE __dist__ < {eps} - {dist_src_sql} + CREATE VIEW {source_view} AS + SELECT ({id_column})::BIGINT AS id, ({expr_point})::DOUBLE PRECISION[] AS point Review comment: We have a test for point as an expr in dev-check. Can we also add a test in dev-check for id_column passed in as an expression. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -164,76 +274,1271 @@ def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps """.format(**locals()) plpy.execute(sql) + # -- Generate output summary table --- + output_summary_table = add_postfix(output_table, '_summary') - plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) + plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) sql = """ CREATE TABLE {output_summary_table} AS SELECT '{id_column}'::VARCHAR AS id_column, + '{expr_point}'::VARCHAR AS expr_point, {eps}::DOUBLE PRECISION AS eps, + {min_samples}::BIGINT AS min_points, '{metric}'::VARCHAR AS metric """.format(**locals()) plpy.execute(sql) - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format( - distance_table, core_points_table, core_edge_table, - reachable_points_table)) +class DBSCAN(object): + def __init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric): + """ + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Output table of points labelled by cluster_id + @param eps The eps value defined by the user + @param min_samples min_samples defined by user + @param metric metric selected by user + + """ + self.schema_madlib = schema_madlib + self.source_table = source_table + self.output_table = output_table + self.n_dims = num_features(source_table, 'point') + self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' + self.min_samples = min_samples + self.metric = metric + # If squared_dist_norm2 is used, we assume eps is set for the squared + # distance. That means the border width must be sqrt(eps) + self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps +class DBSCAN_optimized(DBSCAN): + def __init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric, max_depth=None): + """ + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Output table of points labelled by cluster_id + @param eps The eps value defined by the user + @param min_samples min_samples defined by user + @param metric metric selected by user + """ + + DBSCAN.__init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric) + self.max_depth = max_depth + self.unique_str = unique_string('DBSCAN_opt') + self.num_segs = get_seg_number() + res = plpy.execute("SELECT COUNT(*) FROM {source_table}".format(**locals())) + self.num_points = res[0]['count'] + + def run(self): + unique_str = self.unique_str + distributed_by = self.distributed_by + schema_madlib = self.schema_madlib + + self.dist_map = 'dist_map' + unique_str + + segmented_source = 'segmented_source' + unique_str + self.segmented_source = segmented_source + self.build_optimized_tree() + + rowcount_table = 'rowcounts' + unique_str + leaf_clusters_table = 'leaf_clusters' + unique_str + + count_rows_sql = """ + DROP TABLE IF EXISTS {rowcount_table}; + CREATE TEMP TABLE {rowcount_table} AS + WITH i AS ( + SELECT + count(*) AS num_rows, + dist_id, + __dist_id__ + FROM internal_{segmented_source} + GROUP BY dist_id, __dist_id__ + ), e AS ( + SELECT + count(*) AS num_rows, + dist_id, + __dist_id__ + FROM external_{segmented_source} + GROUP BY dist_id, __dist_id__ + ) + SELECT + i.num_rows AS num_internal_rows, + COALESCE(e.num_rows,0) AS num_external_rows, + i.dist_id, + __dist_id__ + FROM i LEFT JOIN e + USING (dist_id, __dist_id__) + {distributed_by} + """.format(**locals()) + plpy.execute(count_rows_sql) + + # Run dbscan in parallel on each leaf + dbscan_leaves_sql = """ + CREATE TEMP TABLE {leaf_clusters_table} AS + WITH input AS ( + SELECT * + FROM internal_{segmented_source} + UNION ALL + SELECT * + FROM external_{segmented_source} + ), segmented_source AS ( + SELECT + ROW( + id, + point, + FALSE, + 0, + leaf_id, + dist_id + )::{schema_madlib}.__dbscan_record AS rec, + __dist_id__ + FROM input + ) + SELECT (output).*, __dist_id__ FROM ( + SELECT + {schema_madlib}.__dbscan_leaf( + rec, + {self.eps}, + {self.min_samples}, + '{self.metric}', + num_internal_rows, + num_external_rows + ) AS output, + __dist_id__ + FROM segmented_source JOIN {rowcount_table} + USING (__dist_id__) + ) a {distributed_by} + """.format(**locals()) + DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True) + + plpy.execute(""" + DROP TABLE IF EXISTS + internal_{segmented_source}, + external_{segmented_source} + """.format(**locals())) + + cluster_map = 'cluster_map' + unique_str + noise_point = label.NOISE_POINT + + # Replace local cluster_id's in each leaf with + # global cluster_id's which are unique across all leaves + gen_cluster_map_sql = """ + CREATE TEMP TABLE {cluster_map} AS + SELECT + dist_id, + __dist_id__, + cluster_id AS local_id, + ROW_NUMBER() OVER( + ORDER BY dist_id, cluster_id + ) AS global_id + FROM {leaf_clusters_table} + WHERE dist_id = leaf_id AND cluster_id != {noise_point} + GROUP BY __dist_id__, dist_id, cluster_id + {distributed_by} + """.format(**locals()) + plpy.execute(gen_cluster_map_sql) + + globalize_cluster_ids_sql = """ + UPDATE {leaf_clusters_table} lc + SET cluster_id = global_id + FROM {cluster_map} cm WHERE + cm.dist_id = lc.dist_id AND + cluster_id = local_id; + """.format(**locals()) + DEBUG.plpy.execute(globalize_cluster_ids_sql) + + intercluster_edges = 'intercluster_edges' + unique_str + dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)' + + # Build intercluster table of edges connecting clusters + # on different leaves + intercluster_edge_sql = """ + CREATE TEMP TABLE {intercluster_edges} AS + SELECT + internal.cluster_id AS src, + external.cluster_id AS dest + FROM {leaf_clusters_table} internal + JOIN {leaf_clusters_table} external + USING (id) + WHERE internal.is_core_point + AND internal.leaf_id = internal.dist_id + AND external.leaf_id != external.dist_id + GROUP BY src,dest + {dist_by_src} + """.format(**locals()) + DEBUG.plpy.execute(intercluster_edge_sql) + + res = plpy.execute("SELECT count(*) FROM {intercluster_edges}".format(**locals())) + + if int(res[0]['count']) > 0: + wcc_table = 'wcc' + unique_str + plpy.execute(""" + DROP TABLE IF EXISTS + {wcc_table}, {wcc_table}_summary, + {self.output_table} + """.format(**locals())) + + # Find connected components of intercluster_edge_table + wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges, + 'src=src,dest=dest', wcc_table, None) + + # Rename each cluster_id to its component id in the intercluster graph + merge_clusters_sql = """ + UPDATE {leaf_clusters_table} lc + SET cluster_id = wcc.component_id + FROM {wcc_table} wcc + WHERE lc.cluster_id = wcc.global_id + """.format(**locals()) + plpy.execute(merge_clusters_sql) + + plpy.execute(""" + DROP TABLE IF EXISTS + {wcc_table}, {wcc_table}_summary + """.format(**locals())) + + add_crossborder_points_sql = """ + CREATE TABLE {self.output_table} AS + SELECT + id, point, cluster_id, is_core_point + FROM ( + SELECT + id, + point, + is_core_point, + leaf_id = dist_id AS is_internal, + dist_id, + CASE WHEN cluster_id = {noise_point} + THEN MAX(cluster_id) OVER(PARTITION BY id) + ELSE cluster_id + END + FROM {leaf_clusters_table} + ) x WHERE is_internal AND cluster_id != {noise_point} + """.format(**locals()) + DEBUG.plpy.execute(add_crossborder_points_sql) + + plpy.execute(""" + DROP TABLE IF EXISTS {rowcount_table}, + {leaf_clusters_table}, {cluster_map}, + {intercluster_edges} + """.format(**locals())) + + def build_optimized_tree(self): + eps = self.eps + num_segs = self.num_segs + num_points = self.num_points + target = num_points / num_segs + source_table = self.source_table + leaf_bounds_table = 'leaf_bounds' + self.unique_str + max_depth = self.max_depth + n_dims = self.n_dims + dist_map = self.dist_map + distributed_by = self.distributed_by + internal_points = 'internal_' + self.segmented_source + external_points = 'external_' + self.segmented_source + + next_cut = 'next_cut' + self.unique_str + prev_node = 'prev_node' + self.unique_str + + first_cut_sql = """ + DROP TABLE IF EXISTS {prev_node}; + CREATE TEMP TABLE {prev_node} AS + WITH world_bounds AS ( + SELECT + i, + min(point[i]) AS lower, + max(point[i]) AS upper, + max(point[i]) - min(point[i]) AS size, + ceil((max(point[i]) - min(point[i])) / {eps})::INT AS eps_bins + FROM generate_series(1,{n_dims}) AS i, {source_table} + GROUP BY i + ), + density_map AS ( + SELECT i, eps_bin, + eps_bins, + COALESCE(density, 0) AS density + FROM ( + SELECT i, eps_bins, + generate_series(0, eps_bins - 1) AS eps_bin + FROM world_bounds + ) g LEFT JOIN ( + SELECT i, + floor((point[i] - lower)/{eps})::INT AS eps_bin, + eps_bins, + COUNT(*) AS density + FROM + world_bounds, + {source_table} + GROUP BY i, eps_bin, eps_bins + ) a + USING (i, eps_bin, eps_bins) + ), + loss_table AS ( + SELECT i, eps_bin, + left_internal, + {num_points} as points_in_node, + left_external, + right_external, + {self.schema_madlib}.__dbscan_segmentation_loss( + left_internal, + {num_points} - left_internal, + left_external, + right_external, + {num_segs}::BIGINT, + V, + {eps}::DOUBLE PRECISION, + {n_dims}::BIGINT, + eps_bin::DOUBLE PRECISION, + eps_bins::DOUBLE PRECISION + ) AS losses + FROM ( + SELECT i, eps_bin, eps_bins, + {num_segs}::BIGINT AS num_segs, + COALESCE(density, 0) AS right_external, + COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY eps_bin), 0) AS left_external, + SUM(density) OVER(PARTITION BY i ORDER BY eps_bin)::BIGINT AS left_internal + FROM ( + SELECT i, generate_series(0, eps_bins - 1) AS eps_bin FROM world_bounds + ) g LEFT JOIN density_map USING(i, eps_bin) + ) params, + ( + SELECT + {self.schema_madlib}.__dbscan_safe_exp(sum( + {self.schema_madlib}.__dbscan_safe_ln(size) + )) AS V Review comment: Isn't this equivalent to `sum(size)` ? why do we need to call `__dbscan_safe_exp` and `__dbscan_safe_ln` ? ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -653,25 +651,26 @@ class DBSCAN_optimized(DBSCAN): ) SELECT s.id, ARRAY[i] AS coords, - CASE WHEN s.point[i] <= cutoff + CASE WHEN s.point[i] < cutoff Review comment: Do you think it makes sense to add a dev-check test with a small dataset to be sure we do not break this in future? I will leave it to you, we can skip it if you don't think it will help. ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -276,24 +1588,35 @@ def dbscan_help(schema_madlib, message=None, **kwargs): SELECT {schema_madlib}.dbscan( source_table, -- Name of the training data table output_table, -- Name of the output table - id_column, -- Name of id column in source_table + id_column, -- A column or expression of columns specifying a unique id for rows in the source_table expr_point, -- Column name or expression for data points eps, -- The minimum radius of a cluster min_samples, -- The minimum size of a cluster metric, -- The name of the function to use to calculate the -- distance - algorithm -- The algorithm to use for dbscan. + algorithm, -- The algorithm to use for dbscan: brute or rtree Review comment: update rtree=> optimized. Also lets specify default ########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -164,76 +274,1271 @@ def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps """.format(**locals()) plpy.execute(sql) + # -- Generate output summary table --- + output_summary_table = add_postfix(output_table, '_summary') - plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) + plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) sql = """ CREATE TABLE {output_summary_table} AS SELECT '{id_column}'::VARCHAR AS id_column, + '{expr_point}'::VARCHAR AS expr_point, {eps}::DOUBLE PRECISION AS eps, + {min_samples}::BIGINT AS min_points, Review comment: Currently we do not capture the algorithm that was used by dbscan, I think we should also add that to the summary table ########## File path: src/ports/postgres/modules/dbscan/test/dbscan.sql_in ########## @@ -43,14 +43,22 @@ copy dbscan_train_data (id_in, data) FROM stdin delimiter '|'; DROP TABLE IF EXISTS out1, out1_summary, out1_predict; SELECT dbscan('dbscan_train_data','out1','id_in','data',20,4,'squared_dist_norm2','brute'); -SELECT assert(count(DISTINCT id_in) = 5, 'Incorrect cluster 0') FROM out1 WHERE cluster_id = 0 and id_in=ANY(ARRAY[1,2,3,4,5]); +SELECT assert(count(DISTINCT id) = 5, 'Incorrect cluster 0') FROM out1 WHERE cluster_id = 0 and id=ANY(ARRAY[1,2,3,4,5]); -SELECT assert(count(DISTINCT id_in) = 4, 'Incorrect cluster 1') FROM out1 WHERE cluster_id = 1 and id_in=ANY(ARRAY[6,7,8,9]); +SELECT assert(count(DISTINCT id) = 4, 'Incorrect cluster 1') FROM out1 WHERE cluster_id = 1 and id=ANY(ARRAY[6,7,8,9]); + +SELECT assert(id_column = 'id_in', 'id_column field in summary table should have been ''id_in''') FROM out1_summary; SELECT dbscan_predict('out1', 'dbscan_train_data', 'id_in', 'data', 'out1_predict'); SELECT assert(count(DISTINCT cluster_id) = 2, 'Incorrect cluster count') FROM out1_predict; +DROP TABLE IF EXISTS out1, out1_summary, out1_predict; +SELECT dbscan('dbscan_train_data','out1','id_in','data',20,4,'dist_norm2','brute'); + +DROP TABLE IF EXISTS out1, out1_summary, out1_predict; Review comment: don't need to drop out1_predict. While looking at the tests initially, this drop on the table with '_predict' confused me if we were calling predict, so adding this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@madlib.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org