reductionista commented on a change in pull request #571: URL: https://github.com/apache/madlib/pull/571#discussion_r709658024
########## File path: src/ports/postgres/modules/dbscan/dbscan.py_in ########## @@ -164,76 +274,1271 @@ def dbscan(schema_madlib, source_table, output_table, id_column, expr_point, eps """.format(**locals()) plpy.execute(sql) + # -- Generate output summary table --- + output_summary_table = add_postfix(output_table, '_summary') - plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) + plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table)) sql = """ CREATE TABLE {output_summary_table} AS SELECT '{id_column}'::VARCHAR AS id_column, + '{expr_point}'::VARCHAR AS expr_point, {eps}::DOUBLE PRECISION AS eps, + {min_samples}::BIGINT AS min_points, '{metric}'::VARCHAR AS metric """.format(**locals()) plpy.execute(sql) - plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format( - distance_table, core_points_table, core_edge_table, - reachable_points_table)) +class DBSCAN(object): + def __init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric): + """ + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Output table of points labelled by cluster_id + @param eps The eps value defined by the user + @param min_samples min_samples defined by user + @param metric metric selected by user + + """ + self.schema_madlib = schema_madlib + self.source_table = source_table + self.output_table = output_table + self.n_dims = num_features(source_table, 'point') + self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY (__dist_id__)' + self.min_samples = min_samples + self.metric = metric + # If squared_dist_norm2 is used, we assume eps is set for the squared + # distance. That means the border width must be sqrt(eps) + self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps +class DBSCAN_optimized(DBSCAN): + def __init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric, max_depth=None): + """ + Args: + @param schema_madlib Name of the Madlib Schema + @param source_table Training data table + @param output_table Output table of points labelled by cluster_id + @param eps The eps value defined by the user + @param min_samples min_samples defined by user + @param metric metric selected by user + """ + + DBSCAN.__init__(self, schema_madlib, source_table, output_table, + eps, min_samples, metric) + self.max_depth = max_depth + self.unique_str = unique_string('DBSCAN_opt') + self.num_segs = get_seg_number() + res = plpy.execute("SELECT COUNT(*) FROM {source_table}".format(**locals())) + self.num_points = res[0]['count'] + + def run(self): + unique_str = self.unique_str + distributed_by = self.distributed_by + schema_madlib = self.schema_madlib + + self.dist_map = 'dist_map' + unique_str + + segmented_source = 'segmented_source' + unique_str + self.segmented_source = segmented_source + self.build_optimized_tree() + + rowcount_table = 'rowcounts' + unique_str + leaf_clusters_table = 'leaf_clusters' + unique_str + + count_rows_sql = """ + DROP TABLE IF EXISTS {rowcount_table}; + CREATE TEMP TABLE {rowcount_table} AS + WITH i AS ( + SELECT + count(*) AS num_rows, + dist_id, + __dist_id__ + FROM internal_{segmented_source} + GROUP BY dist_id, __dist_id__ + ), e AS ( + SELECT + count(*) AS num_rows, + dist_id, + __dist_id__ + FROM external_{segmented_source} + GROUP BY dist_id, __dist_id__ + ) + SELECT + i.num_rows AS num_internal_rows, + COALESCE(e.num_rows,0) AS num_external_rows, + i.dist_id, + __dist_id__ + FROM i LEFT JOIN e + USING (dist_id, __dist_id__) + {distributed_by} + """.format(**locals()) + plpy.execute(count_rows_sql) + + # Run dbscan in parallel on each leaf + dbscan_leaves_sql = """ + CREATE TEMP TABLE {leaf_clusters_table} AS + WITH input AS ( + SELECT * + FROM internal_{segmented_source} + UNION ALL + SELECT * + FROM external_{segmented_source} + ), segmented_source AS ( + SELECT + ROW( + id, + point, + FALSE, + 0, + leaf_id, + dist_id + )::{schema_madlib}.__dbscan_record AS rec, + __dist_id__ + FROM input + ) + SELECT (output).*, __dist_id__ FROM ( + SELECT + {schema_madlib}.__dbscan_leaf( + rec, + {self.eps}, + {self.min_samples}, + '{self.metric}', + num_internal_rows, + num_external_rows + ) AS output, + __dist_id__ + FROM segmented_source JOIN {rowcount_table} + USING (__dist_id__) + ) a {distributed_by} + """.format(**locals()) + DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True) + + plpy.execute(""" + DROP TABLE IF EXISTS + internal_{segmented_source}, + external_{segmented_source} + """.format(**locals())) + + cluster_map = 'cluster_map' + unique_str + noise_point = label.NOISE_POINT + + # Replace local cluster_id's in each leaf with + # global cluster_id's which are unique across all leaves + gen_cluster_map_sql = """ + CREATE TEMP TABLE {cluster_map} AS + SELECT + dist_id, + __dist_id__, + cluster_id AS local_id, + ROW_NUMBER() OVER( + ORDER BY dist_id, cluster_id + ) AS global_id + FROM {leaf_clusters_table} + WHERE dist_id = leaf_id AND cluster_id != {noise_point} + GROUP BY __dist_id__, dist_id, cluster_id + {distributed_by} + """.format(**locals()) + plpy.execute(gen_cluster_map_sql) + + globalize_cluster_ids_sql = """ + UPDATE {leaf_clusters_table} lc + SET cluster_id = global_id + FROM {cluster_map} cm WHERE + cm.dist_id = lc.dist_id AND + cluster_id = local_id; + """.format(**locals()) + DEBUG.plpy.execute(globalize_cluster_ids_sql) + + intercluster_edges = 'intercluster_edges' + unique_str + dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)' + + # Build intercluster table of edges connecting clusters + # on different leaves + intercluster_edge_sql = """ + CREATE TEMP TABLE {intercluster_edges} AS + SELECT + internal.cluster_id AS src, + external.cluster_id AS dest + FROM {leaf_clusters_table} internal + JOIN {leaf_clusters_table} external + USING (id) + WHERE internal.is_core_point + AND internal.leaf_id = internal.dist_id + AND external.leaf_id != external.dist_id + GROUP BY src,dest + {dist_by_src} + """.format(**locals()) + DEBUG.plpy.execute(intercluster_edge_sql) + + res = plpy.execute("SELECT count(*) FROM {intercluster_edges}".format(**locals())) + + if int(res[0]['count']) > 0: + wcc_table = 'wcc' + unique_str + plpy.execute(""" + DROP TABLE IF EXISTS + {wcc_table}, {wcc_table}_summary, + {self.output_table} + """.format(**locals())) + + # Find connected components of intercluster_edge_table + wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges, + 'src=src,dest=dest', wcc_table, None) + + # Rename each cluster_id to its component id in the intercluster graph + merge_clusters_sql = """ + UPDATE {leaf_clusters_table} lc + SET cluster_id = wcc.component_id + FROM {wcc_table} wcc + WHERE lc.cluster_id = wcc.global_id + """.format(**locals()) + plpy.execute(merge_clusters_sql) + + plpy.execute(""" + DROP TABLE IF EXISTS + {wcc_table}, {wcc_table}_summary + """.format(**locals())) + + add_crossborder_points_sql = """ + CREATE TABLE {self.output_table} AS + SELECT + id, point, cluster_id, is_core_point + FROM ( + SELECT + id, + point, + is_core_point, + leaf_id = dist_id AS is_internal, + dist_id, + CASE WHEN cluster_id = {noise_point} + THEN MAX(cluster_id) OVER(PARTITION BY id) + ELSE cluster_id + END + FROM {leaf_clusters_table} + ) x WHERE is_internal AND cluster_id != {noise_point} + """.format(**locals()) + DEBUG.plpy.execute(add_crossborder_points_sql) + + plpy.execute(""" + DROP TABLE IF EXISTS {rowcount_table}, + {leaf_clusters_table}, {cluster_map}, + {intercluster_edges} + """.format(**locals())) + + def build_optimized_tree(self): + eps = self.eps + num_segs = self.num_segs + num_points = self.num_points + target = num_points / num_segs + source_table = self.source_table + leaf_bounds_table = 'leaf_bounds' + self.unique_str + max_depth = self.max_depth + n_dims = self.n_dims + dist_map = self.dist_map + distributed_by = self.distributed_by + internal_points = 'internal_' + self.segmented_source + external_points = 'external_' + self.segmented_source + + next_cut = 'next_cut' + self.unique_str + prev_node = 'prev_node' + self.unique_str + + first_cut_sql = """ + DROP TABLE IF EXISTS {prev_node}; + CREATE TEMP TABLE {prev_node} AS + WITH world_bounds AS ( + SELECT + i, + min(point[i]) AS lower, + max(point[i]) AS upper, + max(point[i]) - min(point[i]) AS size, + ceil((max(point[i]) - min(point[i])) / {eps})::INT AS eps_bins + FROM generate_series(1,{n_dims}) AS i, {source_table} + GROUP BY i + ), + density_map AS ( + SELECT i, eps_bin, + eps_bins, + COALESCE(density, 0) AS density + FROM ( + SELECT i, eps_bins, + generate_series(0, eps_bins - 1) AS eps_bin + FROM world_bounds + ) g LEFT JOIN ( + SELECT i, + floor((point[i] - lower)/{eps})::INT AS eps_bin, + eps_bins, + COUNT(*) AS density + FROM + world_bounds, + {source_table} + GROUP BY i, eps_bin, eps_bins + ) a + USING (i, eps_bin, eps_bins) + ), + loss_table AS ( + SELECT i, eps_bin, + left_internal, + {num_points} as points_in_node, + left_external, + right_external, + {self.schema_madlib}.__dbscan_segmentation_loss( + left_internal, + {num_points} - left_internal, + left_external, + right_external, + {num_segs}::BIGINT, + V, + {eps}::DOUBLE PRECISION, + {n_dims}::BIGINT, + eps_bin::DOUBLE PRECISION, + eps_bins::DOUBLE PRECISION + ) AS losses + FROM ( + SELECT i, eps_bin, eps_bins, + {num_segs}::BIGINT AS num_segs, + COALESCE(density, 0) AS right_external, + COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY eps_bin), 0) AS left_external, + SUM(density) OVER(PARTITION BY i ORDER BY eps_bin)::BIGINT AS left_internal + FROM ( + SELECT i, generate_series(0, eps_bins - 1) AS eps_bin FROM world_bounds + ) g LEFT JOIN density_map USING(i, eps_bin) + ) params, + ( + SELECT + {self.schema_madlib}.__dbscan_safe_exp(sum( + {self.schema_madlib}.__dbscan_safe_ln(size) + )) AS V Review comment: It's equivalent to PROD(size)... the product of all of the values in the size column. I implemented it this way because I couldn't find any builtin function that does that. It's possible there is one and I just missed it... if so, that would be cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@madlib.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org