Github user jingyimei commented on a diff in the pull request:

    https://github.com/apache/madlib/pull/178#discussion_r138421619
  
    --- Diff: src/ports/postgres/modules/graph/hits.py_in ---
    @@ -0,0 +1,417 @@
    +# coding=utf-8
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +# HITS
    +
    +# Please refer to the hits.sql_in file for the documentation
    +
    +"""
    +@file hits.py_in
    +
    +@namespace graph
    +"""
    +
    +import math
    +import plpy
    +from utilities.control import MinWarning
    +from utilities.utilities import _assert
    +from utilities.utilities import add_postfix
    +from utilities.utilities import extract_keyvalue_params
    +from utilities.utilities import unique_string
    +from utilities.utilities import is_platform_pg
    +
    +from graph_utils import *
    +
    +
    +def validate_hits_args(schema_madlib, vertex_table, vertex_id, edge_table,
    +                       edge_params, out_table, max_iter, threshold):
    +    """
    +    Function to validate input parameters for HITS
    +    """
    +    validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params,
    +                          out_table, 'HITS')
    +    _assert(not threshold or (threshold >= 0.0 and threshold <= 1.0),
    +            "HITS: Invalid threshold value ({0}), must be between 0 and 
1.".
    +            format(threshold))
    +    _assert(max_iter > 0,
    +            """HITS: Invalid max_iter value ({0}), must be a positive 
integer.""".
    +            format(max_iter))
    +
    +
    +def hits(schema_madlib, vertex_table, vertex_id, edge_table, edge_args,
    +         out_table, max_iter, threshold, **kwargs):
    +    """
    +    Function that computes the HITS scores
    +
    +    Args:
    +        @param vertex_table
    +        @param vertex_id
    +        @param edge_table
    +        @param source_vertex
    +        @param dest_vertex
    +        @param out_table
    +        @param max_iter
    +        @param threshold
    +    """
    +    with MinWarning('warning'):
    +        params_types = {'src': str, 'dest': str}
    +        default_args = {'src': 'src', 'dest': 'dest'}
    +        edge_params = extract_keyvalue_params(
    +            edge_args, params_types, default_args)
    +
    +        # populate default values for optional params if null
    +        if max_iter is None:
    +            max_iter = 100
    +        if not vertex_id:
    +            vertex_id = "id"
    +
    +        validate_hits_args(schema_madlib, vertex_table, vertex_id, 
edge_table,
    +                           edge_params, out_table, max_iter, threshold)
    +        summary_table = add_postfix(out_table, "_summary")
    +        _assert(not table_exists(summary_table),
    +                "Graph HITS: Output summary table ({summary_table}) 
already exists."
    +                .format(**locals()))
    +
    +        src = edge_params["src"]
    +        dest = edge_params["dest"]
    +        n_vertices = plpy.execute("""
    +                SELECT COUNT({0}) AS cnt
    +                FROM {1}
    +            """.format(vertex_id, vertex_table))[0]["cnt"]
    +
    +        # Assign default threshold value based on number of nodes in the 
graph.
    +        if threshold is None:
    +            threshold = 1.0 / (n_vertices * 1000)
    +
    +        edge_temp_table = unique_string(desp='temp_edge')
    +        distribution = ('' if is_platform_pg() else
    +                        "DISTRIBUTED BY ({0})".format(dest))
    +        plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table))
    +        plpy.execute("""
    +                CREATE TEMP TABLE {edge_temp_table} AS
    +                SELECT * FROM {edge_table}
    +                {distribution}
    +            """.format(**locals()))
    +
    +        # GPDB and HAWQ have distributed by clauses to help them with 
indexing.
    +        # For Postgres we add the index explicitly.
    +        if is_platform_pg():
    +            plpy.execute("CREATE INDEX ON {0}({1})".format(
    +                edge_temp_table, dest))
    +
    +        # Intermediate tables required.
    +        cur = unique_string(desp='cur')
    +        message = unique_string(desp='message')
    +        v1 = unique_string(desp='v1')
    +        message_unconv_authority = unique_string(
    +            desp='message_unconv_authority')
    +        message_unconv_hub = unique_string(desp="message_unconv_hub")
    +        tmp = unique_string(desp='tmp')
    +        tmp2 = unique_string(desp='tmp2')
    +        tmp3 = unique_string(desp='tmp3')
    +        v2 = unique_string(desp='v2')
    +
    +        if is_platform_pg():
    +            cur_distribution = cnts_distribution = ''
    +        else:
    +            cur_distribution = cnts_distribution = \
    +                "DISTRIBUTED BY ({0})".format(vertex_id)
    +        cur_join_clause = " {cur}.{vertex_id} = 
{edge_temp_table}.{dest}".format(
    +            **locals())
    +        v1_join_clause = "{v1}.{vertex_id} = 
{edge_temp_table}.{src}".format(
    +            **locals())
    +
    +        authority_init_value = 1.0
    +        hub_init_value = 1.0
    +        plpy.execute("""
    +                CREATE TEMP TABLE {cur} AS
    +                SELECT {vertex_id}, {authority_init_value}::DOUBLE 
PRECISION AS authority,
    +                {hub_init_value}::DOUBLE PRECISION AS hub
    +                FROM {vertex_table}
    +                {cur_distribution}
    +            """.format(**locals()))
    +
    +        # The summary table contains the total number of iterations.
    +        plpy.execute("""
    +                CREATE TABLE {summary_table} (
    +                    __iterations__ INTEGER
    +                )
    +            """.format(**locals()))
    +
    +        unconverged_authority_num = 0
    +        unconverged_hub_num = 0
    +        iteration_num = 0
    +        authority_norm = 0
    +        hub_norm = 0
    +
    +        plpy.execute("""
    +                CREATE TABLE {message_unconv_authority} AS
    +                SELECT {cur}.{vertex_id}
    +                FROM {cur}
    +                LIMIT 0
    +            """.format(**locals()))
    +        plpy.execute("""
    +                CREATE TABLE {message_unconv_hub} AS
    +                SELECT {cur}.{vertex_id}
    +                FROM {cur}
    +                LIMIT 0
    +            """.format(**locals()))
    +
    +        for iteration_num in range(max_iter):
    +            
###################################################################
    +            # HITS scores for nodes in a graph at any given iteration 'i' 
is calculated
    +            # as following:
    +            # authority_i(A) = hub_i(B) + hub_i(C) + ..., where B, C are 
nodes that have
    +            # edges that point to node A
    +            # After calculating authority scores for all nodes, hub scores 
are calculated
    +            # as following:
    +            # hub_i(A) = authority_i(D) + authority_i(E) + ..., where D, E 
are nodes that
    +            # A points to
    +            # At the end of each iteration, a normalization will
    +            # be done for all authority scores and hub scores using L2 
distance
    +            
###################################################################
    +
    +            
###################################################################
    +            # calculate authority
    +            # if there is no node that point to A, authority_i(A) = 0
    +            
###################################################################
    +            plpy.execute("""
    +                    CREATE TABLE {message} AS
    +                    SELECT {cur}.{vertex_id} AS {vertex_id},
    +                            COALESCE(SUM({v1}.hub), 0.0) AS authority,
    +                            {cur}.hub AS hub
    +                    FROM {cur}
    +                        LEFT JOIN {edge_temp_table} ON {cur_join_clause}
    +                        LEFT JOIN {cur} AS {v1} ON {v1_join_clause}
    +                    GROUP BY {cur}.{vertex_id}, {cur}.hub
    +                    ORDER BY {cur}.{vertex_id}
    +                    {cur_distribution}
    +                """.format(**locals()))
    +
    +            #####################################################
    +            # calculate hub
    +            # if node A doesn't point to any node, hub_i(A) = 0
    +            #####################################################
    +            message_join_clause = "{message}.{vertex_id} = 
{edge_temp_table}.{src}".format(
    +                **locals())
    +            v2_join_clause = "{v2}.{vertex_id} = 
{edge_temp_table}.{dest}".format(
    +                **locals())
    +            plpy.execute("""
    +                    UPDATE {message}
    +                    SET hub = {tmp2}.hub
    +                    FROM
    +                    (SELECT {message}.{vertex_id} AS {vertex_id},
    +                            COALESCE(SUM({v2}.authority), 0) AS hub
    +                    FROM {message}
    +                        LEFT JOIN {edge_temp_table} ON 
{message_join_clause}
    +                        LEFT JOIN {message} AS {v2} ON {v2_join_clause}
    +                    GROUP BY {message}.{vertex_id}) AS {tmp2}
    +                    WHERE {tmp2}.{vertex_id} = {message}.{vertex_id}
    +                """.format(**locals()))
    +
    +            # normalize authority value with L2 distance
    +            authority_norm = math.sqrt(plpy.execute("""
    +                    SELECT SUM({tmp}.authority_square) AS sum_norm_square
    +                    FROM
    +                    (SELECT POWER(authority, 2) AS authority_square
    +                    FROM {message}) AS {tmp}
    +                """.format(**locals()))[0]["sum_norm_square"])
    +
    +            if authority_norm != 0:
    +                plpy.execute("""
    +                        UPDATE {message}
    +                        SET authority = authority/{authority_norm}
    +                    """.format(**locals()))
    +
    +            # normalize hub value with L2 distance
    +            hub_norm = math.sqrt(plpy.execute("""
    +                    SELECT SUM({tmp3}.hub_square) AS sum_hub_square
    +                    FROM
    +                    (SELECT POWER(hub, 2) AS hub_square
    +                    FROM {message}) AS {tmp3}
    +                """.format(**locals()))[0]["sum_hub_square"])
    +
    +            if hub_norm != 0:
    --- End diff --
    
    Yes, this makes more sense.


---

Reply via email to