reductionista commented on a change in pull request #571:
URL: https://github.com/apache/madlib/pull/571#discussion_r709658024



##########
File path: src/ports/postgres/modules/dbscan/dbscan.py_in
##########
@@ -164,76 +274,1271 @@ def dbscan(schema_madlib, source_table, output_table, 
id_column, expr_point, eps
         """.format(**locals())
         plpy.execute(sql)
 
+        # -- Generate output summary table ---
+
         output_summary_table = add_postfix(output_table, '_summary')
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table))
 
+        plpy.execute("DROP TABLE IF EXISTS {0}".format(output_summary_table))
         sql = """
             CREATE TABLE {output_summary_table} AS
             SELECT  '{id_column}'::VARCHAR AS id_column,
+                    '{expr_point}'::VARCHAR AS expr_point,
                     {eps}::DOUBLE PRECISION AS eps,
+                    {min_samples}::BIGINT AS min_points,
                     '{metric}'::VARCHAR AS metric
             """.format(**locals())
         plpy.execute(sql)
 
-        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}, {3}".format(
-                     distance_table, core_points_table, core_edge_table,
-                     reachable_points_table))
+class DBSCAN(object):
+    def __init__(self, schema_madlib, source_table, output_table,
+                 eps, min_samples, metric):
+        """
+            Args:
+                @param schema_madlib        Name of the Madlib Schema
+                @param source_table         Training data table
+                @param output_table         Output table of points labelled by 
cluster_id
+                @param eps                  The eps value defined by the user
+                @param min_samples          min_samples defined by user
+                @param metric               metric selected by user
+
+        """
+        self.schema_madlib = schema_madlib
+        self.source_table = source_table
+        self.output_table = output_table
+        self.n_dims = num_features(source_table, 'point')
+        self.distributed_by = '' if is_platform_pg() else 'DISTRIBUTED BY 
(__dist_id__)'
+        self.min_samples = min_samples
+        self.metric = metric
+        # If squared_dist_norm2 is used, we assume eps is set for the squared
+        # distance.  That means the border width must be sqrt(eps)
+        self.eps = sqrt(eps) if metric == DEFAULT_METRIC else eps
 
+class DBSCAN_optimized(DBSCAN):
+    def __init__(self, schema_madlib, source_table, output_table,
+                 eps, min_samples, metric, max_depth=None):
+        """
+            Args:
+                @param schema_madlib        Name of the Madlib Schema
+                @param source_table         Training data table
+                @param output_table         Output table of points labelled by 
cluster_id
+                @param eps                  The eps value defined by the user
+                @param min_samples          min_samples defined by user
+                @param metric               metric selected by user
+        """
+
+        DBSCAN.__init__(self, schema_madlib, source_table, output_table,
+                        eps, min_samples, metric)
+        self.max_depth = max_depth
+        self.unique_str = unique_string('DBSCAN_opt')
+        self.num_segs = get_seg_number()
+        res = plpy.execute("SELECT COUNT(*) FROM 
{source_table}".format(**locals()))
+        self.num_points = res[0]['count']
+
+    def run(self):
+        unique_str = self.unique_str
+        distributed_by = self.distributed_by
+        schema_madlib = self.schema_madlib
+
+        self.dist_map = 'dist_map' + unique_str
+
+        segmented_source = 'segmented_source' + unique_str
+        self.segmented_source = segmented_source
+        self.build_optimized_tree()
+
+        rowcount_table = 'rowcounts' + unique_str
+        leaf_clusters_table = 'leaf_clusters' + unique_str
+
+        count_rows_sql = """
+            DROP TABLE IF EXISTS {rowcount_table};
+            CREATE TEMP TABLE {rowcount_table} AS
+            WITH i AS (
+                SELECT
+                    count(*) AS num_rows,
+                    dist_id,
+                    __dist_id__
+                FROM internal_{segmented_source}
+                GROUP BY dist_id, __dist_id__
+            ), e AS (
+                SELECT
+                    count(*) AS num_rows,
+                    dist_id,
+                    __dist_id__
+                FROM external_{segmented_source}
+                GROUP BY dist_id, __dist_id__
+            )
+            SELECT
+                i.num_rows AS num_internal_rows,
+                COALESCE(e.num_rows,0) AS num_external_rows,
+                i.dist_id,
+                __dist_id__
+            FROM i LEFT JOIN e
+                USING (dist_id, __dist_id__)
+            {distributed_by}
+            """.format(**locals())
+        plpy.execute(count_rows_sql)
+
+        # Run dbscan in parallel on each leaf
+        dbscan_leaves_sql = """
+            CREATE TEMP TABLE {leaf_clusters_table} AS
+            WITH input AS (
+                    SELECT *
+                    FROM internal_{segmented_source}
+                UNION ALL
+                    SELECT *
+                    FROM external_{segmented_source}
+            ), segmented_source AS (
+                SELECT
+                    ROW(
+                        id,
+                        point,
+                        FALSE,
+                        0,
+                        leaf_id,
+                        dist_id
+                    )::{schema_madlib}.__dbscan_record AS rec,
+                    __dist_id__
+                FROM input
+            )
+            SELECT (output).*, __dist_id__ FROM (
+                SELECT
+                    {schema_madlib}.__dbscan_leaf(
+                                                  rec,
+                                                  {self.eps},
+                                                  {self.min_samples},
+                                                  '{self.metric}',
+                                                  num_internal_rows,
+                                                  num_external_rows
+                                                 ) AS output,
+                    __dist_id__
+                FROM segmented_source JOIN {rowcount_table}
+                    USING (__dist_id__)
+            ) a {distributed_by}
+        """.format(**locals())
+        DEBUG.plpy_execute(dbscan_leaves_sql, report_segment_tracebacks=True)
+
+        plpy.execute("""
+            DROP TABLE IF EXISTS
+            internal_{segmented_source},
+            external_{segmented_source}
+        """.format(**locals()))
+
+        cluster_map = 'cluster_map' + unique_str
+        noise_point = label.NOISE_POINT
+
+        # Replace local cluster_id's in each leaf with
+        #   global cluster_id's which are unique across all leaves
+        gen_cluster_map_sql = """
+            CREATE TEMP TABLE {cluster_map} AS
+                SELECT
+                    dist_id,
+                    __dist_id__,
+                    cluster_id AS local_id,
+                    ROW_NUMBER() OVER(
+                        ORDER BY dist_id, cluster_id
+                    ) AS global_id
+                FROM {leaf_clusters_table}
+                    WHERE dist_id = leaf_id AND cluster_id != {noise_point}
+                GROUP BY __dist_id__, dist_id, cluster_id
+                {distributed_by}
+        """.format(**locals())
+        plpy.execute(gen_cluster_map_sql)
+
+        globalize_cluster_ids_sql = """
+            UPDATE {leaf_clusters_table} lc
+                SET cluster_id = global_id
+            FROM {cluster_map} cm WHERE
+                cm.dist_id = lc.dist_id AND
+                cluster_id = local_id;
+        """.format(**locals())
+        DEBUG.plpy.execute(globalize_cluster_ids_sql)
+
+        intercluster_edges = 'intercluster_edges' + unique_str
+        dist_by_src = '' if is_platform_pg() else 'DISTRIBUTED BY (src)'
+
+        # Build intercluster table of edges connecting clusters
+        #  on different leaves
+        intercluster_edge_sql = """
+            CREATE TEMP TABLE {intercluster_edges} AS
+                SELECT
+                    internal.cluster_id AS src,
+                    external.cluster_id AS dest
+                FROM {leaf_clusters_table} internal
+                JOIN {leaf_clusters_table} external
+                    USING (id)
+                WHERE internal.is_core_point
+                    AND internal.leaf_id  = internal.dist_id
+                    AND external.leaf_id != external.dist_id
+                GROUP BY src,dest
+                {dist_by_src}
+        """.format(**locals())
+        DEBUG.plpy.execute(intercluster_edge_sql)
+
+        res = plpy.execute("SELECT count(*) FROM 
{intercluster_edges}".format(**locals()))
+
+        if int(res[0]['count']) > 0:
+            wcc_table = 'wcc' + unique_str
+            plpy.execute("""
+            DROP TABLE IF EXISTS
+                {wcc_table}, {wcc_table}_summary,
+                {self.output_table}
+            """.format(**locals()))
+
+            # Find connected components of intercluster_edge_table
+            wcc(schema_madlib, cluster_map, 'global_id', intercluster_edges,
+            'src=src,dest=dest', wcc_table, None)
+
+           # Rename each cluster_id to its component id in the intercluster 
graph
+            merge_clusters_sql = """
+            UPDATE {leaf_clusters_table} lc
+                SET cluster_id = wcc.component_id
+            FROM {wcc_table} wcc
+                WHERE lc.cluster_id = wcc.global_id
+            """.format(**locals())
+            plpy.execute(merge_clusters_sql)
+
+            plpy.execute("""
+            DROP TABLE IF EXISTS
+                {wcc_table}, {wcc_table}_summary
+            """.format(**locals()))
+
+        add_crossborder_points_sql = """
+            CREATE TABLE {self.output_table} AS
+            SELECT
+                id, point, cluster_id, is_core_point
+            FROM (
+                SELECT
+                    id,
+                    point,
+                    is_core_point,
+                    leaf_id = dist_id AS is_internal,
+                    dist_id,
+                    CASE WHEN cluster_id = {noise_point}
+                        THEN MAX(cluster_id) OVER(PARTITION BY id)
+                        ELSE cluster_id
+                    END
+                FROM {leaf_clusters_table}
+            ) x WHERE is_internal AND cluster_id != {noise_point}
+        """.format(**locals())
+        DEBUG.plpy.execute(add_crossborder_points_sql)
+
+        plpy.execute("""
+            DROP TABLE IF EXISTS {rowcount_table},
+            {leaf_clusters_table}, {cluster_map},
+            {intercluster_edges}
+        """.format(**locals()))
+
+    def build_optimized_tree(self):
+        eps = self.eps
+        num_segs = self.num_segs
+        num_points = self.num_points
+        target = num_points / num_segs
+        source_table = self.source_table
+        leaf_bounds_table = 'leaf_bounds' + self.unique_str
+        max_depth = self.max_depth
+        n_dims = self.n_dims
+        dist_map = self.dist_map
+        distributed_by = self.distributed_by
+        internal_points = 'internal_' + self.segmented_source
+        external_points = 'external_' + self.segmented_source
+
+        next_cut = 'next_cut' + self.unique_str
+        prev_node = 'prev_node' + self.unique_str
+
+        first_cut_sql = """
+            DROP TABLE IF EXISTS {prev_node};
+            CREATE TEMP TABLE {prev_node} AS
+            WITH world_bounds AS (
+                SELECT
+                    i,
+                    min(point[i]) AS lower,
+                    max(point[i]) AS upper,
+                    max(point[i]) - min(point[i]) AS size,
+                    ceil((max(point[i]) - min(point[i])) / {eps})::INT AS 
eps_bins
+                FROM generate_series(1,{n_dims}) AS i, {source_table}
+                GROUP BY i
+            ),
+            density_map AS (
+                SELECT i, eps_bin,
+                    eps_bins,
+                    COALESCE(density, 0) AS density
+                FROM (
+                    SELECT i, eps_bins,
+                        generate_series(0, eps_bins - 1) AS eps_bin
+                    FROM world_bounds
+                ) g LEFT JOIN (
+                    SELECT i,
+                        floor((point[i] - lower)/{eps})::INT AS eps_bin,
+                        eps_bins,
+                        COUNT(*) AS density
+                    FROM
+                        world_bounds,
+                        {source_table}
+                    GROUP BY i, eps_bin, eps_bins
+                ) a
+                USING (i, eps_bin, eps_bins)
+            ),
+            loss_table AS (
+                SELECT i, eps_bin,
+                    left_internal,
+                    {num_points} as points_in_node,
+                    left_external,
+                    right_external,
+                    {self.schema_madlib}.__dbscan_segmentation_loss(
+                        left_internal,
+                        {num_points} - left_internal,
+                        left_external,
+                        right_external,
+                        {num_segs}::BIGINT,
+                        V,
+                        {eps}::DOUBLE PRECISION,
+                        {n_dims}::BIGINT,
+                        eps_bin::DOUBLE PRECISION,
+                        eps_bins::DOUBLE PRECISION
+                    ) AS losses
+                FROM (
+                    SELECT i, eps_bin, eps_bins,
+                        {num_segs}::BIGINT AS num_segs,
+                        COALESCE(density, 0) AS right_external,
+                        COALESCE(LEAD(density) OVER(PARTITION BY i ORDER BY 
eps_bin), 0) AS left_external,
+                        SUM(density) OVER(PARTITION BY i ORDER BY 
eps_bin)::BIGINT AS left_internal
+                    FROM (
+                        SELECT i, generate_series(0, eps_bins - 1) AS eps_bin 
FROM world_bounds
+                    ) g LEFT JOIN density_map USING(i, eps_bin)
+                ) params,
+                (
+                    SELECT
+                        {self.schema_madlib}.__dbscan_safe_exp(sum(
+                            {self.schema_madlib}.__dbscan_safe_ln(size)
+                        )) AS V

Review comment:
       It's equivalent to PROD(size)... the product of all of the values in the 
size column.  I implemented it this way because I couldn't find any builtin 
function that does that.  It's possible there is one and I just missed it... if 
so, that would be cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@madlib.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to