This is an automated email from the ASF dual-hosted git repository. xunh pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-bluemarlin.git
The following commit(s) were added to refs/heads/main by this push: new e9316a4 Update lookalike application new 371e4b2 Merge pull request #13 from radibnia77/main e9316a4 is described below commit e9316a40cac096e6de44730e1c49acd448e899c0 Author: Reza <reza_a...@yahoo.com> AuthorDate: Thu Sep 16 15:09:50 2021 -0700 Update lookalike application These are changes for lookalike application 1. User consolidation for fast processing 2. Add Integration tests 3. Refactoring --- .../application/pipeline/dags => }/README.md | 0 Model/lookalike-model/doc/ssd-v2.pdf | Bin 406365 -> 0 bytes Model/lookalike-model/doc/ssd-v3.pdf | Bin 0 -> 431277 bytes .../{pipeline/dags/README.md => __init__.py} | 0 .../application/legacy_files/distance_table.py | 95 ------- .../legacy_files/distance_table_list.py | 144 ----------- .../application/pipeline/__init__.py | 15 ++ .../application/pipeline/config.yml | 34 ++- .../lookalike_model/application/pipeline/run.sh | 14 +- .../application/pipeline/score_generator.py | 21 +- ...vector_rebucketing.py => score_matrix_table.py} | 47 ++-- .../application/pipeline/score_vector_table.py | 17 +- .../application/pipeline/seed_user_selector.py | 64 ----- .../pipeline/top_n_similarity_table_generator.py | 111 ++++---- .../lookalike_model/application/rest_client.py | 89 ------- .../validation_plan_0.py} | 10 +- .../application/validations/validation_plan_1.py | 94 +++++++ Model/lookalike-model/lookalike_model/config.yml | 6 +- .../lookalike_model/pipeline/main_clean.py | 16 +- .../lookalike_model/pipeline/main_keywords.py | 113 +++++++++ .../lookalike_model/pipeline/main_trainready.py | 59 +---- Model/lookalike-model/lookalike_model/run.sh | 7 + .../pipeline/dags => tests/application}/README.md | 0 .../pipeline/config_score_matrix_table.yml | 38 +++ .../pipeline/config_top_n_similarity.yml | 38 +++ .../pipeline/test_score_matrix_table.py | 224 ++++++++++++++++ .../test_top_n_similarity_table_generator_1.py | 183 +++++++++++++ .../test_top_n_similarity_table_generator_2.py | 282 +++++++++++++++++++++ .../pipeline/dags => tests/pipeline}/README.md | 0 .../tests/pipeline/config_clean.yml | 2 + .../tests/pipeline/config_keywords.yml | 22 ++ .../tests/pipeline/data_generator.py | 87 +++++-- .../tests/pipeline/test_main_clean.py | 24 +- .../tests/pipeline/test_main_keywords.py | 88 +++++++ Model/lookalike-model/tests/run_test.sh | 23 +- 35 files changed, 1376 insertions(+), 591 deletions(-) diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md b/Model/lookalike-model/README.md similarity index 100% copy from Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md copy to Model/lookalike-model/README.md diff --git a/Model/lookalike-model/doc/ssd-v2.pdf b/Model/lookalike-model/doc/ssd-v2.pdf deleted file mode 100644 index 0c0502d..0000000 Binary files a/Model/lookalike-model/doc/ssd-v2.pdf and /dev/null differ diff --git a/Model/lookalike-model/doc/ssd-v3.pdf b/Model/lookalike-model/doc/ssd-v3.pdf new file mode 100644 index 0000000..3729198 Binary files /dev/null and b/Model/lookalike-model/doc/ssd-v3.pdf differ diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md b/Model/lookalike-model/lookalike_model/application/__init__.py similarity index 100% copy from Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md copy to Model/lookalike-model/lookalike_model/application/__init__.py diff --git a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py deleted file mode 100644 index e82c018..0000000 --- a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table.py +++ /dev/null @@ -1,95 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0.html - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import yaml -import argparse -from pyspark import SparkContext -from pyspark.sql import HiveContext -from pyspark.sql.functions import lit, col, udf -from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType -# from rest_client import predict, str_to_intlist -import requests -import json -import argparse -from pyspark.sql.functions import udf -from math import sqrt -import time - - - - -def distance(l1): - def _distance(l2): - dist = sum([l1[el]*l2[el] for el,value in l1.items()]) - return dist - return _distance - -def x(l1): - _udf_distance = udf(distance(l1), FloatType() ) - return _udf_distance - -def run(hive_context, cfg): - - # input tables - keywords_table = cfg["input"]["keywords_table"] - seeduser_table = cfg["input"]["seeduser_table"] - lookalike_loaded_table_norm = cfg['output']['gucdocs_loaded_table_norm'] - - # output dataframes - lookalike_score_table = cfg["output"]["score_table"] - - command = "SELECT * FROM {}" - df = hive_context.sql(command.format(lookalike_loaded_table_norm)) - df_keywords = hive_context.sql(command.format(keywords_table)) - df_seed_user = hive_context.sql(command.format(seeduser_table)) - - - #### creating a tuple of did and kws for seed users - df_seed_user = df_seed_user.join(df.select('did','kws_norm'), on=['did'], how='left') - # df_seed_user = df_seed_user.withColumn("seed_user_list", zip_("did", "kws")) - seed_user_list = df_seed_user.select('did','kws_norm').collect() - # seed_user list = [(did1, {k1:0, k2:0.2, ...}), (did2, )] - # user = - c = 0 - temp_list = [] - for item in seed_user_list: - - c+= 1 - if c > 850 : - break - df = df.withColumn(item[0],x(item[1])(col('kws_norm'))) - - df.write.option("header", "true").option( - "encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_score_table) - - - -if __name__ == "__main__": - start = time.time() - parser = argparse.ArgumentParser(description=" ") - parser.add_argument('config_file') - args = parser.parse_args() - with open(args.config_file, 'r') as yml_file: - cfg = yaml.safe_load(yml_file) - - sc = SparkContext.getOrCreate() - sc.setLogLevel('WARN') - hive_context = HiveContext(sc) - - run(hive_context=hive_context, cfg=cfg) - sc.stop() - end = time.time() - print('Runtime of the program is:', (end - start)) diff --git a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py b/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py deleted file mode 100644 index 2217994..0000000 --- a/Model/lookalike-model/lookalike_model/application/legacy_files/distance_table_list.py +++ /dev/null @@ -1,144 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0.html - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import yaml -from pyspark import SparkContext -from pyspark.sql import HiveContext -from pyspark.sql.functions import lit, col, udf, array, mean -from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType -import argparse -from pyspark.sql.functions import udf -import time -import math - -''' -spark-submit --executor-memory 16G --driver-memory 24G --num-executors 16 --executor-cores 5 --master yarn --conf spark.driver.maxResultSize=8g distance_table_list.py config.yml -''' - - -def euclidean(l1): - def _euclidean(l2): - list = [] - for item in l1: - similarity = 1 - (math.sqrt(sum([(item[i]-l2[i]) ** 2 for i in range(len(item))]))/math.sqrt(len(item))) - list.append(similarity) - return list - return _euclidean - - -def dot(l1): - def _dot(l2): - list = [] - for item in l1: - similarity = sum([item[i]*l2[i] for i in range(len(item))]) - list.append(similarity) - return list - return _dot - - -def ux(l1): - if alg == "euclidean": - _udf_similarity = udf(euclidean(l1), ArrayType(FloatType())) - if alg =="dot": - _udf_similarity = udf(dot(l1), ArrayType(FloatType())) - return _udf_similarity - - - -def l(d): - s = [value for key, value in d.items()] - return s -udf_tolist = udf(l, ArrayType(FloatType())) - -def top_n(l): - #### top 10 - n = 10 - l.sort() - return l[-n:] -udf_top_n = udf(top_n, ArrayType(FloatType())) - -def _top_n(l1, l2): - n = 10 - l = sorted(l1+l2) - return l[-n:] - -_udf_top_n = udf(_top_n, ArrayType(FloatType())) - -def _mean(l): - ave = sum(l)/len(l) - return ave -udf_mean = udf(_mean, FloatType()) - -def run(hive_context, cfg): - - ## load dataframes - lookalike_score_table_norm = cfg['output']['score_norm_table'] - keywords_table = cfg["input"]["keywords_table"] - seeduser_table = cfg["input"]["seeduser_table"] - lookalike_similarity_table = cfg["output"]["similarity_table"] - - command = "SELECT * FROM {}" - df = hive_context.sql(command.format(lookalike_score_table_norm)) - df_keywords = hive_context.sql(command.format(keywords_table)) - df_seed_user = hive_context.sql(command.format(seeduser_table)) - - - #### creating a tuple of did and kws for seed users - if alg == "dot": - df = df.withColumn('kws_norm_list', udf_tolist(col('kws_norm'))) - if alg == "euclidean": - df = df.withColumn('kws_norm_list', udf_tolist(col('kws'))) - df_seed_user = df_seed_user.join(df.select('did','kws_norm_list'), on=['did'], how='left') - seed_user_list = df_seed_user.select('did', 'kws_norm_list').collect() - -## batch 1 : 0-100 801 seed - batch_length = 800 - c = 0 - #### i=0, c=0 , batched_user=[0,200], top_10 - total_c = len(seed_user_list) - df = df.withColumn('top_10', array(lit(0.0))) - while total_c > 0 : - len_tobe_p = min(batch_length,total_c) - total_c-= len_tobe_p - batched_user = [item[1] for item in seed_user_list[c: c+len_tobe_p]] - df = df.withColumn("similarity_list",ux(batched_user)(col('kws_norm_list'))) - df = df.withColumn("top_10", _udf_top_n(col("similarity_list"),col("top_10"))) - c+=len_tobe_p - - df = df.withColumn("mean_score",udf_mean(col("top_10"))) - df.write.option("header", "true").option( - "encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(lookalike_similarity_table) - extended_did = df.sort(col('mean_score').desc()).select('did', 'mean_score') - - - -if __name__ == "__main__": - start = time.time() - parser = argparse.ArgumentParser(description=" ") - parser.add_argument('config_file') - args = parser.parse_args() - with open(args.config_file, 'r') as yml_file: - cfg = yaml.safe_load(yml_file) - - sc = SparkContext.getOrCreate() - sc.setLogLevel('WARN') - hive_context = HiveContext(sc) - - ## select similarity algorithm - alg = cfg["input"]["alg"] - run(hive_context=hive_context, cfg=cfg) - sc.stop() - end = time.time() - print('Runtime of the program is:', (end - start)) diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py b/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py new file mode 100644 index 0000000..3e46a55 --- /dev/null +++ b/Model/lookalike-model/lookalike_model/application/pipeline/__init__.py @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml index cd941f6..485f7be 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/config.yml +++ b/Model/lookalike-model/lookalike_model/application/pipeline/config.yml @@ -1,35 +1,31 @@ -product_tag: 'lookalike' -pipeline_tag: '08042021' +product_tag: 'lookalike_application' +pipeline_tag: '08172021_1000' score_generator: input: - log_table : "lookalike_03042021_logs" - did_table: "lookalike_03042021_trainready" + log_table : "lookalike_08172021_1000_logs" + did_table: "lookalike_08172021_1000_trainready" keywords_table: "din_ad_keywords_09172020" - test_table: "lookalike_trainready_jimmy_test" + significant_keywords_table: "lookalike_08172021_1000_keywords" din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict" din_model_length: 20 - seeduser_table : "lookalike_seeduser" - number_of_seeduser: 1000 extend: 2000 alg: "euclidean" ##### currently just support "euclideand" and "dot" output: - did_score_table: "{product_tag}_score_{pipeline_tag}" - score_norm_table: "{product_tag}_score_norm_{pipeline_tag}" - normalize: False - + score_table: "{product_tag}_{pipeline_tag}_score" + normalize: False score_vector: keywords_table: "din_ad_keywords_09172020" - score_norm_table: "{product_tag}_score_norm_{pipeline_tag}" - score_vector_table: "{product_tag}_score_vector_{pipeline_tag}" + score_table: "{product_tag}_{pipeline_tag}_score" + score_vector_table: "{product_tag}_{pipeline_tag}_score_vector" did_bucket_size: 2 did_bucket_step: 2 -score_vector_rebucketing: +score_matrix_table: did_bucket_size: 2 did_bucket_step: 2 - alpha_did_bucket_size: 20 #default=1000 - score_vector_alpha_table: '{product_tag}_score_vector_alpha_{pipeline_tag}' + score_matrix_table: '{product_tag}_{pipeline_tag}_score_matrix' top_n_similarity: - did_bucket_step: 1 - alpha_did_bucket_step: 10 + did_bucket_size: 100 + did_bucket_step: 100 + cross_bucket_size: 1 # in production this should be as same as did_bucket_size top_n: 10 - similarity_table: "{product_tag}_similarity_{pipeline_tag}" \ No newline at end of file + similarity_table: "{product_tag}_{pipeline_tag}_similarity" \ No newline at end of file diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh index d5bf875..a141ebc 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/run.sh +++ b/Model/lookalike-model/lookalike_model/application/pipeline/run.sh @@ -1,16 +1,10 @@ #!/bin/bash -# Not used as part of pipeline -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict seed_user_selector.py config.yml "29" +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py config.yml -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_generator.py config.yml +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matrix_table.py config.yml -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml - -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml - -# Not used as part of pipeline -spark-submit ---master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py config.yml "29" +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 16G --driver-memory 16G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict top_n_similarity_table_generator.py config.yml diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py index 2dbebda..6dd86b2 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py +++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_generator.py @@ -29,7 +29,7 @@ from util import resolve_placeholder from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition ''' -This process generates the score-norm-table with the following format. +This process generates the score-table with the following format. DataFrame[age: int, gender: int, did: string, did_index: bigint, interval_starting_time: array<string>, interval_keywords: array<string>, @@ -55,7 +55,7 @@ def str_to_intlist(table): return ji -def inputData(record, keyword, length): +def input_data(record, keyword, length): if len(record['show_counts']) >= length: hist = flatten(record['show_counts'][:length]) instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)} @@ -69,7 +69,7 @@ def inputData(record, keyword, length): def predict(serving_url, record, length, new_keyword): body = {'instances': []} for keyword in new_keyword: - instance = inputData(record, keyword, length) + instance = input_data(record, keyword, length) body['instances'].append(instance) body_json = json.dumps(body) result = requests.post(serving_url, data=body_json).json() @@ -174,18 +174,21 @@ if __name__ == '__main__': hive_context = HiveContext(sc) # load dataframes - did_table, keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][ - 'keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length'] + did_table, keywords_table, significant_keywords_table, din_tf_serving_url, length = cfg['score_generator']['input']['did_table'], cfg['score_generator']['input'][ + 'keywords_table'], cfg['score_generator']['input'][ + 'significant_keywords_table'], cfg['score_generator']['input']['din_model_tf_serving_url'], cfg['score_generator']['input']['din_model_length'] command = 'SELECT * FROM {}' df_did = hive_context.sql(command.format(did_table)) - df_keywords = hive_context.sql(command.format(keywords_table)) + + command = 'SELECT T1.keyword,T1.spread_app_id,T1.keyword_index FROM {} AS T1 JOIN {} AS T2 ON T1.keyword=T2.keyword' + df_keywords = hive_context.sql(command.format(keywords_table, significant_keywords_table)) # temporary adding to filter based on active keywords df_keywords = df_keywords.filter((df_keywords.keyword == 'video') | (df_keywords.keyword == 'shopping') | (df_keywords.keyword == 'info') | (df_keywords.keyword == 'social') | (df_keywords.keyword == 'reading') | (df_keywords.keyword == 'travel') | (df_keywords.keyword == 'entertainment')) - did_loaded_table = cfg['score_generator']['output']['did_score_table'] - score_norm_table = cfg['score_generator']['output']['score_norm_table'] + + score_table = cfg['score_generator']['output']['score_table'] # create a CTR score generator instance and run to get the loaded did ctr_score_generator = CTRScoreGenerator(df_did, df_keywords, din_tf_serving_url, length) @@ -198,4 +201,4 @@ if __name__ == '__main__': df = df.withColumn('kws_norm', udf_normalize(col('kws'))) # save the loaded did to hive table - write_to_table_with_partition(df, score_norm_table, partition=('did_bucket'), mode='overwrite') + write_to_table_with_partition(df, score_table, partition=('did_bucket'), mode='overwrite') diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py similarity index 62% rename from Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py rename to Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py index 7cae65c..ee82c06 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_rebucketing.py +++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_matrix_table.py @@ -17,8 +17,8 @@ import yaml import argparse from pyspark import SparkContext -from pyspark.sql import HiveContext -from pyspark.sql.functions import lit, col, udf +from pyspark.sql import HiveContext, SparkSession, Window +from pyspark.sql.functions import lit, col, udf, collect_list from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, IntegerType # from rest_client import predict, str_to_intlist import requests @@ -36,41 +36,41 @@ from lookalike_model.pipeline.util import write_to_table, write_to_table_with_pa ''' To run, execute the following in application folder. -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_rebucketing.py config.yml +spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_matirx_table.py config.yml -This process generates added secondary buckects ids (alpha-did-bucket). +This process consolidates bucket score vectors into matrices. ''' -def assign_new_bucket_id(df, n, new_column_name): - def __hash_sha256(s): - hex_value = hashlib.sha256(s.encode('utf-8')).hexdigest() - return int(hex_value, 16) - _udf = udf(lambda x: __hash_sha256(x) % n, IntegerType()) - df = df.withColumn(new_column_name, _udf(df.did)) - return df - - -def run(hive_context, cfg): +def run(spark_session, hive_context, cfg): score_vector_table = cfg['score_vector']['score_vector_table'] - bucket_size = cfg['score_vector_rebucketing']['did_bucket_size'] - bucket_step = cfg['score_vector_rebucketing']['did_bucket_step'] - alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_did_bucket_size'] - score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table'] + bucket_size = cfg['score_matrix_table']['did_bucket_size'] + bucket_step = cfg['score_matrix_table']['did_bucket_step'] + score_matrix_table = cfg['score_matrix_table']['score_matrix_table'] first_round = True + num_batches = (bucket_size + bucket_step - 1) / bucket_step + batch_num = 1 for did_bucket in range(0, bucket_size, bucket_step): - command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket, did_bucket+bucket_step-1) + print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, did_bucket)) + + max_bucket = min(did_bucket+bucket_step-1, bucket_size) + command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_vector_table, did_bucket, max_bucket) + # command = "SELECT did_bucket, collect_list(struct(did, score_vector, c1)) AS item FROM {} WHERE did_bucket BETWEEN {} AND {} GROUP BY did_bucket".format(score_vector_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size)) df = hive_context.sql(command) - df = assign_new_bucket_id(df, alpha_bucket_size, 'alpha_did_bucket') + df = df.groupBy('did_bucket').agg( + collect_list('did').alias('did_list'), + collect_list('score_vector').alias('score_matrix'), + collect_list('c1').alias('c1_list')) mode = 'overwrite' if first_round else 'append' - write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket', 'alpha_did_bucket'), - score_vector_alpha_table, partition=('did_bucket', 'alpha_did_bucket'), mode=mode) + write_to_table_with_partition(df.select('did_list', 'score_matrix', 'c1_list', 'did_bucket'), + score_matrix_table, partition=('did_bucket'), mode=mode) first_round = False + batch_num += 1 if __name__ == "__main__": @@ -84,8 +84,9 @@ if __name__ == "__main__": sc = SparkContext.getOrCreate() sc.setLogLevel('WARN') hive_context = HiveContext(sc) + spark_session = SparkSession(sc) - run(hive_context=hive_context, cfg=cfg) + run(spark_session, hive_context=hive_context, cfg=cfg) sc.stop() end = time.time() print('Runtime of the program is:', (end - start)) diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py index 5c13fc8..aa8ee51 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py +++ b/Model/lookalike-model/lookalike_model/application/pipeline/score_vector_table.py @@ -36,7 +36,7 @@ from util import resolve_placeholder ''' To run, execute the following in application folder. -spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict score_vector_table.py config.yml This process generates the score_vector_table table. @@ -50,10 +50,11 @@ The top-n-similarity table is ''' + def run(hive_context, cfg): keywords_table = cfg["score_vector"]["keywords_table"] - score_norm_table = cfg['score_vector']['score_norm_table'] + score_table = cfg['score_vector']['score_table'] score_vector_table = cfg['score_vector']['score_vector_table'] bucket_size = cfg['score_vector']['did_bucket_size'] bucket_step = cfg['score_vector']['did_bucket_step'] @@ -65,8 +66,12 @@ def run(hive_context, cfg): # add score-vector iterativly first_round = True + num_batches = (bucket_size + bucket_step - 1) / bucket_step + batch_num = 1 for did_bucket in range(0, bucket_size, bucket_step): - command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_norm_table, did_bucket, did_bucket+bucket_step-1) + print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, did_bucket)) + + command = "SELECT did, did_bucket, kws FROM {} WHERE did_bucket BETWEEN {} AND {}".format(score_table, did_bucket, min(did_bucket+bucket_step-1, bucket_size)) # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0 | # [social -> 0.24231663, entertainment -> 0.20828941, reading -> 0.44120282, video -> 0.34497723, travel -> 0.3453492, shopping -> 0.5347804, info -> 0.1978679]| @@ -75,10 +80,12 @@ def run(hive_context, cfg): udf(lambda kws: [kws[keyword] if keyword in kws else 0.0 for keyword in keywords], ArrayType(FloatType()))(df.kws)) df = df.withColumn('c1', udf(lambda x: float(np.array(x).dot(np.array(x))), FloatType())(df.score_vector)) - + mode = 'overwrite' if first_round else 'append' write_to_table_with_partition(df.select('did', 'score_vector', 'c1', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode) + # write_to_table_with_partition(df.select('did', 'did_bucket'), score_vector_table, partition=('did_bucket'), mode=mode) first_round = False + batch_num += 1 if __name__ == "__main__": @@ -90,7 +97,7 @@ if __name__ == "__main__": cfg = yaml.safe_load(yml_file) resolve_placeholder(cfg) sc = SparkContext.getOrCreate() - sc.setLogLevel('WARN') + sc.setLogLevel('INFO') hive_context = HiveContext(sc) run(hive_context=hive_context, cfg=cfg) diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py b/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py deleted file mode 100644 index 691d588..0000000 --- a/Model/lookalike-model/lookalike_model/application/pipeline/seed_user_selector.py +++ /dev/null @@ -1,64 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0.html - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from pyspark import SparkContext -from pyspark.sql import HiveContext -import yaml -import argparse -from util import resolve_placeholder - - -''' -input: logfile -output: lookalike_seeduser table - - -''' - - -def run(hive_context, cfg, kwi): - seed_user_table = cfg['input']['seeduser_table'] - log_table = cfg['input']['log_table'] - number_of_seeduser = cfg['input']['number_of_seeduser'] - - # command = "select * from (select * from {} where is_click=1 and keyword_index=29) as s join (select * from {} where is_click=1 and keyword_index=26) as b on b.did = s.did where s.gender = 1" - command = "SELECT * FROM {} WHERE is_click=1 AND keyword_index={}" - df = hive_context.sql(command.format(log_table, kwi)) - user_list = df.select('did').alias('did').distinct().limit(number_of_seeduser) - user_list.cache() - - user_list.write.option("header", "true").option( - "encoding", "UTF-8").mode("overwrite").format('hive').saveAsTable(seed_user_table) - - -if __name__ == "__main__": - """ - select seed users - """ - parser = argparse.ArgumentParser(description=" ") - parser.add_argument('config_file') - parser.add_argument('kwi') - args = parser.parse_args() - kwi = args.kwi - with open(args.config_file, 'r') as yml_file: - cfg = yaml.safe_load(yml_file) - resolve_placeholder(cfg) - sc = SparkContext.getOrCreate() - sc.setLogLevel('WARN') - hive_context = HiveContext(sc) - - run(hive_context=hive_context, cfg=cfg, kwi=kwi) - sc.stop() diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py index 66363c5..60071b6 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py +++ b/Model/lookalike-model/lookalike_model/application/pipeline/top_n_similarity_table_generator.py @@ -19,22 +19,16 @@ import argparse import pyspark.sql.functions as fn from pyspark import SparkContext -from pyspark.sql import HiveContext -from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, StructType +from pyspark.sql import HiveContext, SparkSession +from pyspark.sql.types import FloatType, StringType, StructType, StructField, ArrayType, MapType, IntegerType +from pyspark.sql.functions import udf, col, explode -# from rest_client import predict, str_to_intlist -import requests -import json -import argparse -from pyspark.sql.functions import udf from math import sqrt import time import numpy as np import itertools import heapq from util import resolve_placeholder - - from lookalike_model.pipeline.util import write_to_table, write_to_table_with_partition ''' @@ -53,65 +47,87 @@ The top-n-similarity table is ''' -def run(sc, hive_context, cfg): +def run(spark_session, hive_context, cfg): - score_vector_alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table'] + score_matrix_table = cfg['score_matrix_table']['score_matrix_table'] similarity_table = cfg['top_n_similarity']['similarity_table'] - N = cfg['top_n_similarity']['top_n'] - - did_bucket_size = cfg['score_vector_rebucketing']['did_bucket_size'] + did_bucket_size = cfg['top_n_similarity']['did_bucket_size'] did_bucket_step = cfg['top_n_similarity']['did_bucket_step'] - - alpha_bucket_size = cfg['score_vector_rebucketing']['alpha_did_bucket_size'] - alpha_bucket_step = cfg['top_n_similarity']['alpha_did_bucket_step'] + cross_bucket_size = cfg['top_n_similarity']['cross_bucket_size'] + top_n_value = cfg['top_n_similarity']['top_n'] first_round = True + num_batches = (did_bucket_size + did_bucket_step - 1) / did_bucket_step + batch_num = 1 for did_bucket in range(0, did_bucket_size, did_bucket_step): + print('Processing batch {} of {} bucket number: {}'.format(batch_num, num_batches, did_bucket)) - command = "SELECT did, did_bucket, score_vector, c1 FROM {} WHERE did_bucket BETWEEN {} AND {}".format( - score_vector_alpha_table, did_bucket, did_bucket + did_bucket_step - 1) + command = "SELECT did_list, did_bucket, score_matrix, c1_list FROM {} WHERE did_bucket BETWEEN {} AND {}".format( + score_matrix_table, did_bucket, min(did_bucket + did_bucket_step - 1, did_bucket_size)) # |0004f3b4731abafa9ac54d04cb88782ed61d30531262decd799d91beb6d6246a|0 | # [0.24231663, 0.20828941, 0.0]| df = hive_context.sql(command) df = df.withColumn('top_n_similar_user', fn.array()) - for alpha_bucket in range(0, alpha_bucket_size, alpha_bucket_step): - command = """SELECT did, score_vector, c1, alpha_did_bucket - FROM {} WHERE alpha_did_bucket BETWEEN {} AND {}""" - command = command.format(score_vector_alpha_table, - alpha_bucket, alpha_bucket + alpha_bucket_step - 1) + for cross_bucket in range(0, cross_bucket_size): + print('Processing batch {}, alpha bucket {}'.format(batch_num, cross_bucket)) + + command = """SELECT did_list, score_matrix, c1_list, did_bucket + FROM {} WHERE did_bucket = {} """ + command = command.format(score_matrix_table, cross_bucket) df_user = hive_context.sql(command) - block_user = df_user.select('did', 'score_vector', 'c1').collect() - block_user_did_score = ([_['did'] for _ in block_user], [_['score_vector'] for _ in block_user]) - block_user_broadcast = sc.broadcast(block_user_did_score) - - c2 = np.array([_['c1'] for _ in block_user]) - c2 = np.square(np.linalg.norm(c2)).tolist() - c2_broadcast = sc.broadcast(c2) - - def calculate_similarity(user_score_vector, top_n_user_score, c1): - m = len(user_score_vector) - user_score_vector = np.array(user_score_vector) - dids, other_score_vectors = block_user_broadcast.value - other_score_vectors = np.array(other_score_vectors) - cross_mat = np.matmul(user_score_vector, other_score_vectors.transpose()) - c2 = np.array(c2_broadcast.value) - similarity = np.sqrt(m) - np.sqrt(c1 + c2 - 2 * cross_mat) - user_score_s = list(itertools.izip(dids, similarity.tolist())) - user_score_s.extend(top_n_user_score) - user_score_s = heapq.nlargest(N, user_score_s, key=lambda x: x[1]) - return user_score_s + cross_users = df_user.select('did_list', 'score_matrix', 'c1_list').collect() + + if len(cross_users) == 0: + continue + + cross_users_did_score = (cross_users[0]['did_list'], cross_users[0]['score_matrix']) + c2 = np.array(cross_users[0]['c1_list']) + + def calculate_similarity(cross_users_did_score, c2): + def __helper(user_score_matrix, top_n_user_score, c1_list): + user_score_matrix = np.array(user_score_matrix) + m = user_score_matrix.shape[1] + cross_dids, cross_score_matrix = cross_users_did_score + cross_score_matrix = np.array(cross_score_matrix) + cross_mat = np.matmul(user_score_matrix, cross_score_matrix.transpose()) + + similarity = np.sqrt(m) - np.sqrt(np.maximum(np.expand_dims(c1_list, 1) + c2 - (2 * cross_mat), 0.0)) + result = [] + for cosimilarity, top_n in itertools.izip_longest(similarity, top_n_user_score, fillvalue=[]): + user_score_s = list(itertools.izip(cross_dids, cosimilarity.tolist())) + user_score_s.extend(top_n) + user_score_s = heapq.nlargest(top_n_value, user_score_s, key=lambda x: x[1]) + result.append(user_score_s) + return result + return __helper elements_type = StructType([StructField('did', StringType(), False), StructField('score', FloatType(), False)]) # update top_n_similar_user field - df = df.withColumn('top_n_similar_user', udf(calculate_similarity, ArrayType(elements_type))(df.score_vector, df.top_n_similar_user, df.c1)) + df = df.withColumn('top_n_similar_user', udf(calculate_similarity(cross_users_did_score, c2), + ArrayType(ArrayType(elements_type)))(df.score_matrix, df.top_n_similar_user, df.c1_list)) + + # Unpack the matrices into individual users. + # Note: in Spark 2.4, the udf can be replaced with arrays_zip(). + def combine(x, y): + return list(zip(x, y)) + df = df.withColumn("new", udf(combine, ArrayType(StructType([StructField("did", StringType()), + StructField("top_n_similar_user", ArrayType(StructType([ + StructField("did", StringType(), True), + StructField("score", FloatType(), True), ]), True)), + ])))(df.did_list, df.top_n_similar_user)) + df = df.withColumn("new", explode("new")) + df = df.select(col("new.did").alias("did"), + col("new.top_n_similar_user").alias("top_n_similar_user"), + "did_bucket") mode = 'overwrite' if first_round else 'append' # use the partitioned field at the end of the select. Order matters. - write_to_table_with_partition(df.select('did', 'top_n_similar_user', 'did_bucket'), similarity_table, partition=('did_bucket'), mode=mode) + write_to_table_with_partition(df, similarity_table, partition=('did_bucket'), mode=mode) first_round = False + batch_num += 1 if __name__ == "__main__": @@ -126,8 +142,9 @@ if __name__ == "__main__": sc = SparkContext.getOrCreate() sc.setLogLevel('INFO') hive_context = HiveContext(sc) + spark_session = SparkSession(sc) - run(sc=sc, hive_context=hive_context, cfg=cfg) + run(spark_session, hive_context, cfg) sc.stop() end = time.time() print('Runtime of the program is:', (end - start)) diff --git a/Model/lookalike-model/lookalike_model/application/rest_client.py b/Model/lookalike-model/lookalike_model/application/rest_client.py deleted file mode 100644 index c3b7a02..0000000 --- a/Model/lookalike-model/lookalike_model/application/rest_client.py +++ /dev/null @@ -1,89 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0.html - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -THE script gets the data, process it and send the request to -the rest client and print out the response from the the rest API -""" -import requests -import json -import argparse -import yaml - - -def flatten(lst): - f = [y for x in lst for y in x] - return f - - -def str_to_intlist(table): - ji = [] - for k in [table[j].split(",") for j in range(len(table))]: - s = [] - for a in k: - b = int(a.split(":")[0]) - s.append(b) - ji.append(s) - return ji - - -def inputData(record, keyword, length): - if len(record['show_counts']) >= length: - hist = flatten(record['show_counts'][:length]) - instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)} - else: - hist = flatten(record['show_counts']) - # [hist.extend([0]) for i in range(length - len(hist))] - instance = {'hist_i': hist, 'u': record['did'], 'i': keyword, 'j': keyword, 'sl': len(hist)} - return instance - - -def predict(serving_url, record, length, new_keyword): - body = {"instances": []} - for keyword in new_keyword: - instance = inputData(record, keyword, length) - body['instances'].append(instance) - body_json = json.dumps(body) - result = requests.post(serving_url, data=body_json).json() - if 'error' in result.keys(): - predictions = result['error'] - else: - predictions = result['predictions'] - return predictions - - -def run(cfg): - length = cfg['input']['din_model_length'] - url = cfg['input']['din_model_tf_serving_url'] - ##time_interval, did, click_counts, show_counts, media_category, net_type_index, gender, age, keyword - record = {"did": 0, "show_counts": ['25:3', '29:6,25:2', '29:1,25:2,14:2', '14:1,29:2,25:2', - '29:1', '26:1,14:2,25:4', '14:1,25:3'], "show_clicks": [], "age": '10', "gender": '3'} - record['show_counts'] = str_to_intlist(record['show_counts']) - new_keyword = [26, 27, 29] - response = predict(serving_url=url, record=record, length=length, new_keyword=new_keyword) - - print(response) - - -if __name__ == '__main__': # record is equal to window size - parser = argparse.ArgumentParser(description='Prepare data') - parser.add_argument('config_file') - args = parser.parse_args() - - with open(args.config_file, 'r') as ymlfile: - cfg = yaml.safe_load(ymlfile) - - run(cfg) diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/validation.py b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py similarity index 87% rename from Model/lookalike-model/lookalike_model/application/pipeline/validation.py rename to Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py index 5409472..08f1f5a 100644 --- a/Model/lookalike-model/lookalike_model/application/pipeline/validation.py +++ b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_0.py @@ -20,6 +20,10 @@ from pyspark import SparkContext import argparse, yaml from util import resolve_placeholder +''' +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation.py config.yml "29" +''' + def counting(click,kwi): count_click = 0 @@ -31,10 +35,10 @@ def counting(click,kwi): def run(hive_context,cfg, kwi): lookalike_score_table = cfg["output"]["similarity_table"] - seed_user_table = cfg['input']['seeduser_table'] + seed_user_table = 'lookalike_seeduser' extend = cfg['input']['extend'] - test_table = cfg['input']['test_table'] - number_of_seeduser = cfg['input']['number_of_seeduser'] + test_table = 'lookalike_trainready_jimmy_test' + number_of_seeduser = 1000 ######### filtering the df and removing seed users command = "select * from {}" diff --git a/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py new file mode 100644 index 0000000..e92ea97 --- /dev/null +++ b/Model/lookalike-model/lookalike_model/application/validations/validation_plan_1.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql.functions import lit, col, udf, rand +from pyspark.sql import HiveContext +from pyspark import SparkContext +import argparse +import yaml +from util import resolve_placeholder + +''' + +spark-submit --master yarn --num-executors 10 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict validation_plan_1.py config.yml '29' + +1. Randomly select 1000 users. It is user_list. +2. Calculate click/imp for kw. It is actual_interest. +3. Sort user_list based on actual_interest and get first 500. This is list M. +4. Sort user_list based on kw score and get first 500. It is list N. +5. Final result is (size of common(M,N) ) / (size of M) + + +''' + + +def process(user, kwi): + l = user['kwi_show_counts'] + imp_for_kwi = sum([int(j.split(':')[1]) if j.split(':')[0] == kwi else 0 for _ in l for j in _.split(',')]) + + l = user['kwi_click_counts'] + click_for_kwi = sum([int(j.split(':')[1]) if j.split(':')[0] == kwi else 0 for _ in l for j in _.split(',')]) + + actual_interest = click_for_kwi * 1.0 / imp_for_kwi if imp_for_kwi != 0 else 0 + + kwi_score = 0 + if kwi in user['kws']: + kwi_score = user['kws']['kwi'] + + return (user['did'], actual_interest, kwi_score) + + +def run(hive_context, cfg, kwi): + RANDOM_USERS_SIZE = 1000 + lookalike_score_table = cfg['score_generator']['output']['score_table'] + + # Randomly select 1000 users. It is user_list. + command = "SELECT * FROM {}" + df = hive_context.sql(command.format(lookalike_score_table)) + user_list = df.orderBy(rand()).limit(RANDOM_USERS_SIZE).collect() + user_metrics = [process(user, kwi) for user in user_list] + + n = sorted(user_metrics, key=lambda x: x[1], reverse=True)[:RANDOM_USERS_SIZE//2] + m = sorted(user_metrics, key=lambda x: x[2], reverse=True)[:RANDOM_USERS_SIZE//2] + + n = set([_[0] for _ in n]) + m = set([_[0] for _ in m]) + + size_of_common = len(n.intersection(m)) + result = size_of_common*1.0/(RANDOM_USERS_SIZE//2) + + print(result) + + +if __name__ == "__main__": + """ + validate the result + """ + parser = argparse.ArgumentParser(description=" ") + parser.add_argument('config_file') + parser.add_argument('kwi') + args = parser.parse_args() + kwi = args.kwi + with open(args.config_file, 'r') as yml_file: + cfg = yaml.safe_load(yml_file) + resolve_placeholder(cfg) + + sc = SparkContext.getOrCreate() + sc.setLogLevel('WARN') + hive_context = HiveContext(sc) + + run(hive_context=hive_context, cfg=cfg, kwi=kwi) + sc.stop() diff --git a/Model/lookalike-model/lookalike_model/config.yml b/Model/lookalike-model/lookalike_model/config.yml index dbfdf08..fe05afc 100644 --- a/Model/lookalike-model/lookalike_model/config.yml +++ b/Model/lookalike-model/lookalike_model/config.yml @@ -99,6 +99,9 @@ features: 'task_id', 'pps_inside_exprmt_ab_tag'] pipeline: + main_keywords: + keyword_output_table: '{product_tag}_{pipeline_tag}_keywords' + keyword_threshold: 0.01 # Portion of showlog traffic that a keyword must reach to be included. main_clean: did_bucket_num: 2 # Number of partitions for did load_logs_in_minutes: 14400 #1440/day, original=14400 @@ -161,9 +164,6 @@ pipeline: logs_output_table_name: '{product_tag}_{pipeline_tag}_logs' main_trainready: trainready_output_table: '{product_tag}_{pipeline_tag}_trainready' - show_threshold_low: 0.5 # per day (set to -1 for no low threshold) - show_threshold_high: -1 # per day (set to -1 for no high threshold) - active_interval_threshold: 0.2 # proportion of days (set to -1 for no active day filter) tfrecords: tfrecords_statistics_path: '{product_tag}_{pipeline_tag}_tfrecord_statistics.pkl' tfrecords_hdfs_path: '{product_tag}_{pipeline_tag}_tfrecord' # it is hdfs location for tfrecords, over-writes the existing files diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py index 9efa3c6..ffa38e9 100644 --- a/Model/lookalike-model/lookalike_model/pipeline/main_clean.py +++ b/Model/lookalike-model/lookalike_model/pipeline/main_clean.py @@ -20,7 +20,7 @@ from datetime import datetime, timedelta import timeit from pyspark import SparkContext -from pyspark.sql.functions import col, udf +from pyspark.sql.functions import col, udf, collect_set from pyspark.sql.types import BooleanType, IntegerType, StringType from pyspark.sql import HiveContext from util import load_config, load_batch_config, load_df @@ -90,6 +90,12 @@ def clean_batched_log(df, df_persona, conditions, df_keywords, did_bucket_num): df = add_did_bucket(df, did_bucket_num) return df +def filter_keywords(df, keywords): + # User defined function to return if the keyword is in the inclusion set. + _udf = udf(lambda x: x in keywords, BooleanType()) + + # Return the filtered dataframe. + return df.filter(_udf(col('keyword'))) def clean_logs(cfg, df_persona, df_keywords, log_table_names): sc = SparkContext.getOrCreate() @@ -185,6 +191,8 @@ def run(hive_context, cfg): did_bucket_num = cfg_clean['did_bucket_num'] + keywords_effective_table = cfg['pipeline']['main_keywords']['keyword_output_table'] + command = """SELECT did, gender_new_dev AS gender, forecast_age_dev AS age @@ -201,6 +209,12 @@ def run(hive_context, cfg): df_keywords = load_df(hive_context, keywords_table) #[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')] + # Use the effective keyword table to filter the keyword table which + # will serve to filter the show and click log tables. + df_effective_keywords = load_df(hive_context, keywords_effective_table) + effective_keywords = df_effective_keywords.select(collect_set('keyword')).first()[0] + df_keywords = filter_keywords(df_keywords, effective_keywords) + log_table_names = (showlog_table, showlog_new_table, clicklog_table, clicklog_new_table) clean_logs(cfg, df_persona, df_keywords, log_table_names) diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py b/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py new file mode 100644 index 0000000..e09e16b --- /dev/null +++ b/Model/lookalike-model/lookalike_model/pipeline/main_keywords.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta +from pyspark import SparkContext + +from util import load_config, load_batch_config, print_batching_info +from util import write_to_table, generate_add_keywords, resolve_placeholder + + +def run(hive_context, showlog_table, keywords_mapping_table, create_keywords_mapping, + start_date, end_date, load_minutes, keyword_threshold, effective_keywords_table): + """ + # This script goes through the showlog and identifies all the + # keywords that comprise a portion of the overall traffic greater + # than the specified threshold. + """ + + # Create ad keywords table if does not exist. + if create_keywords_mapping: + generate_add_keywords(keywords_mapping_table) + #[Row(keyword=u'education', keyword_index=1, spread_app_id=u'C100203741')] + + starting_time = datetime.strptime(start_date, "%Y-%m-%d") + ending_time = datetime.strptime(end_date, "%Y-%m-%d") + + # In batches, get the show counts for all of the keywords. + keyword_totals = {} + batched_round = 1 + while starting_time < ending_time: + time_start = starting_time.strftime("%Y-%m-%d %H:%M:%S") + batch_time_end = starting_time + timedelta(minutes=load_minutes) + batch_time_end = min(batch_time_end, ending_time) + time_end = batch_time_end.strftime("%Y-%m-%d %H:%M:%S") + print_batching_info("Main keywords", batched_round, time_start, time_end) + + # Get the impressions for the time window joined with the keywords. + command = """SELECT + logs.spread_app_id, + logs.show_time, + kw.keyword + FROM {log_table} as logs inner join {keyword_table} as kw on logs.spread_app_id = kw.spread_app_id + WHERE logs.show_time >= '{time_start}' AND show_time < '{time_end}' """ + df_showlog_batched = hive_context.sql(command.format(log_table=showlog_table, + keyword_table=keywords_mapping_table, time_start=time_start, time_end=time_end)) + + # Get the number of impressions for each keyword. + df = df_showlog_batched.groupby('keyword').count().collect() + + # Add the impression count for each keyword to the dictionary. + for row in df: + keyword_totals[row['keyword']] = keyword_totals.get(row['keyword'], 0) + int(row['count']) + starting_time = batch_time_end + batched_round += 1 + + # With the total keyword counts calculated, identify the keywords that meet + # the threshold to be included. + # Get the total and calculate the count threshold for effective keywords. + total_impressions = sum(keyword_totals.values()) + impression_threshold = keyword_threshold * total_impressions + + # For each keyword, if its count is greater than the threshold, add + # it to the effective keyword list. + effective_keywords = [] + for key, value in keyword_totals.items(): + if value > impression_threshold: + effective_keywords.append((key,)) # Append as a tuple + + # Create the dataframe with the results and save to Hive. + sc = SparkContext.getOrCreate() + df_effective_keywords = sc.parallelize(effective_keywords).toDF(['keyword']) + write_to_table(df_effective_keywords, effective_keywords_table) + + +if __name__ == "__main__": + + """ + main_keywords is a process to identify the effective keywords that + comprise a percentage of the traffic above a given threshold. + """ + sc, hive_context, cfg = load_config( + description="clean data of persona, clicklog and showlog.") + resolve_placeholder(cfg) + + cfg_clean = cfg['pipeline']['main_clean'] + showlog_table = cfg['showlog_table_name'] + keywords_mapping_table = cfg['keywords_table'] + create_keywords_mapping = cfg_clean['create_keywords'] + + cfg_keywords = cfg['pipeline']['main_keywords'] + keyword_threshold = cfg_keywords['keyword_threshold'] + effective_keywords_table = cfg_keywords['keyword_output_table'] + + start_date, end_date, load_minutes = load_batch_config(cfg) + + run(hive_context, showlog_table, keywords_mapping_table, create_keywords_mapping, + start_date, end_date, load_minutes, keyword_threshold, effective_keywords_table) + + sc.stop() + diff --git a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py index cf8218f..0fc3247 100644 --- a/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py +++ b/Model/lookalike-model/lookalike_model/pipeline/main_trainready.py @@ -23,7 +23,7 @@ from pyspark import SparkContext from pyspark.sql import functions as fn from pyspark.sql.functions import lit, col, udf, collect_list, concat_ws, first, create_map, monotonically_increasing_id, row_number from pyspark.sql.window import Window -from pyspark.sql.types import FloatType, IntegerType, ArrayType, StringType, LongType +from pyspark.sql.types import IntegerType, ArrayType, StringType, LongType from pyspark.sql import HiveContext from datetime import datetime, timedelta from util import write_to_table, write_to_table_with_partition, print_batching_info, resolve_placeholder, load_config, load_batch_config, load_df @@ -36,54 +36,10 @@ def date_to_timestamp(dt): epoch = datetime.utcfromtimestamp(0) return int((dt - epoch).total_seconds()) -def filter_users(df, show_threshold_low, show_threshold_high, active_interval_threshold, total_num_intervals): - def list_map_count(x): - value_total = 0 - for cell in x: - key_values = cell.split(',') - for key_value in key_values: - print('list_map_count: {}'.format(key_value)) - _, value = key_value.split(':') - value_total += int(value) - return float(value_total)/total_num_intervals - - # Returns the ratio of items in the list that have non-zero values. - # def list_map_nonzero_count(x): - def list_map_nonzero_ratio(x): - active_intervals = 0 - for cell in x: - key_values = cell.split(',') - for key_value in key_values: - print('list_map_nonzero_ratio: {}'.format(key_value)) - _, value = key_value.split(':') - if int(value) != 0: - active_intervals += 1 - break - return float(active_intervals)/total_num_intervals - - # Aggregate user activity by impressions. - df = df.withColumn('total_show_count', udf(list_map_count, FloatType())(col('kwi_show_counts'))) - - # Filter out users below set activity level. - if show_threshold_low >= 0: - df = df.filter(df.total_show_count > show_threshold_low) - if show_threshold_high >= 0: - df = df.filter(df.total_show_count < show_threshold_high) - - # Calculate the number of active intervals. - df = df.withColumn('show_active_interval_ratio', udf(list_map_nonzero_ratio, FloatType())(col('kwi_show_counts'))) - - # Filter out users below set number of active intervals. - if (active_interval_threshold >= 0): - df = df.filter(df.show_active_interval_ratio > active_interval_threshold) - - return df - def generate_trainready(hive_context, batch_config, interval_time_in_seconds, - logs_table_name, trainready_table, did_bucket_num, - show_threshold_low, show_threshold_high, active_interval_threshold): + logs_table_name, trainready_table, did_bucket_num): def group_batched_logs(df_logs): # group logs from did + interval_time + keyword. @@ -240,9 +196,6 @@ def generate_trainready(hive_context, batch_config, for i, feature_name in enumerate(['interval_starting_time', 'interval_keywords', 'kwi', 'kwi_show_counts', 'kwi_click_counts']): df = df.withColumn(feature_name, col('metrics_list').getItem(i)) - # Filter the users by activity level. - df = filter_users(df, show_threshold_low, show_threshold_high, active_interval_threshold, len(all_intervals)) - # Add did_index w = Window.orderBy("did_bucket", "did") df = df.withColumn('row_number', row_number().over(w)) @@ -265,17 +218,11 @@ def run(hive_context, cfg): cfg_train = cfg['pipeline']['main_trainready'] trainready_table = cfg_train['trainready_output_table'] - show_threshold_low = cfg_train['show_threshold_low'] - show_threshold_high = cfg_train['show_threshold_high'] - active_interval_threshold = cfg_train['active_interval_threshold'] - did_bucket_num = cfg_clean['did_bucket_num'] batch_config = load_batch_config(cfg) - generate_trainready(hive_context, batch_config, interval_time_in_seconds, - logs_table_name, trainready_table, did_bucket_num, - show_threshold_low, show_threshold_high, active_interval_threshold) + generate_trainready(hive_context, batch_config, interval_time_in_seconds, logs_table_name, trainready_table, did_bucket_num) if __name__ == "__main__": diff --git a/Model/lookalike-model/lookalike_model/run.sh b/Model/lookalike-model/lookalike_model/run.sh index b24adab..201fa0a 100644 --- a/Model/lookalike-model/lookalike_model/run.sh +++ b/Model/lookalike-model/lookalike_model/run.sh @@ -1,5 +1,12 @@ #!/bin/bash +# main_keywords: identify the keywords with proportion of traffic above set threshold. +if false +then + # generate the effective keywords table. + spark-submit --master yarn --num-executors 20 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/main_keywords.py config.yml +fi + # main_clean: preparing cleaned persona, click and show logs data. if false then diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md b/Model/lookalike-model/tests/application/README.md similarity index 100% copy from Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md copy to Model/lookalike-model/tests/application/README.md diff --git a/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml b/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml new file mode 100644 index 0000000..eec98fc --- /dev/null +++ b/Model/lookalike-model/tests/application/pipeline/config_score_matrix_table.yml @@ -0,0 +1,38 @@ +product_tag: 'lookalike_application' +pipeline_tag: 'unittest_score_matrix_table' +score_generator: + input: + log_table : "lookalike_03042021_logs" + did_table: "lookalike_03042021_trainready" + keywords_table: "din_ad_keywords_09172020" + din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict" + din_model_length: 20 + extend: 2000 + alg: "euclidean" ##### currently just support "euclideand" and "dot" + output: + score_table: "{product_tag}_score_{pipeline_tag}" + normalize: False + +score_vector: + keywords_table: "din_ad_keywords_09172020" + score_table: "{product_tag}_{pipeline_tag}_score" + score_vector_table: '{product_tag}_{pipeline_tag}_input_score_vector' + did_bucket_size: 2 + did_bucket_step: 2 +score_vector_rebucketing: + did_bucket_size: 2 + did_bucket_step: 1 + alpha_did_bucket_size: 4 + score_vector_alpha_table: '{product_tag}_{pipeline_tag}_input_score_vector_alpha' +score_matrix_table: + did_bucket_size: 1 + did_bucket_step: 1 + score_matrix_table: '{product_tag}_{pipeline_tag}_output_score_matrix' +top_n_similarity: + did_bucket_size: 1 + did_bucket_step: 1 + cross_bucket_size: 1 + cross_bucket_step: 1 + alpha_did_bucket_step: 1 + top_n: 10 + similarity_table: "{product_tag}_{pipeline_tag}_output_similarity" \ No newline at end of file diff --git a/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml b/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml new file mode 100644 index 0000000..d18d6e9 --- /dev/null +++ b/Model/lookalike-model/tests/application/pipeline/config_top_n_similarity.yml @@ -0,0 +1,38 @@ +product_tag: 'lookalike_application' +pipeline_tag: 'unittest_top_n_similarity' +score_generator: + input: + log_table : "lookalike_03042021_logs" + did_table: "lookalike_03042021_trainready" + keywords_table: "din_ad_keywords_09172020" + din_model_tf_serving_url: "http://10.193.217.105:8506/v1/models/lookalike:predict" + din_model_length: 20 + extend: 2000 + alg: "euclidean" ##### currently just support "euclideand" and "dot" + output: + score_table: "{product_tag}_score_{pipeline_tag}" + normalize: False + +score_vector: + keywords_table: "din_ad_keywords_09172020" + score_table: "{product_tag}_{pipeline_tag}_score" + score_vector_table: "{product_tag}_{pipeline_tag}_score_vector" + did_bucket_size: 2 + did_bucket_step: 2 +score_vector_rebucketing: + did_bucket_size: 2 + did_bucket_step: 1 + alpha_did_bucket_size: 4 + score_vector_alpha_table: '{product_tag}_{pipeline_tag}_input_score_vector_alpha' +score_matrix_table: + did_bucket_size: 2 + did_bucket_step: 1 + score_matrix_table: '{product_tag}_{pipeline_tag}_input_score_matrix' +top_n_similarity: + did_bucket_size: 1 + did_bucket_step: 1 + cross_bucket_size: 1 + cross_bucket_step: 1 + alpha_did_bucket_step: 1 + top_n: 10 + similarity_table: "{product_tag}_{pipeline_tag}_output_similarity" \ No newline at end of file diff --git a/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py b/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py new file mode 100644 index 0000000..035c994 --- /dev/null +++ b/Model/lookalike-model/tests/application/pipeline/test_score_matrix_table.py @@ -0,0 +1,224 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import yaml +from pyspark import SparkContext +from pyspark.sql import SparkSession, HiveContext +from pyspark.sql.types import DataType, StringType, StructField, StructType, FloatType, IntegerType, ArrayType, MapType +from lookalike_model.application.pipeline import util, score_matrix_table + + +class TestTopNSimilarityTableGenerator(unittest.TestCase): + + def setUp (self): + # Set the log level. + self.sc = SparkContext.getOrCreate() + self.sc.setLogLevel('ERROR') + + # Initialize the Spark session + self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate() + self.hive_context = HiveContext(self.sc) + + def test_run(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run ***') + + # Load the test configuration. + with open('application/pipeline/config_score_matrix_table.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + + # Get the names of the input and output tables. + vector_table = cfg['score_vector']['score_vector_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + + # Create the input table. + self.create_vector_table(vector_table, True) + + # Run the function being tested. + score_matrix_table.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did_bucket""".format(matrix_table) + df = self.hive_context.sql(command) + df.show() + + command = """select * from {} order by did_bucket""".format(vector_table) + df_vector = self.hive_context.sql(command) + + # Validate the output. + self.validate_similarity_table(df, df_vector, True) + + def test_run2(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run2 ***') + + # Load the test configuration. + with open('application/pipeline/config_score_matrix_table.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + + # Get the names of the input and output tables. + vector_table = cfg['score_vector']['score_vector_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + + cfg['score_matrix_table']['did_bucket_size'] = 2 + + # Create the input table. + self.create_vector_table(vector_table, False) + + # Run the function being tested. + score_matrix_table.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did_bucket""".format(matrix_table) + df = self.hive_context.sql(command) + df.show() + + command = """select * from {} order by did_bucket""".format(vector_table) + df_vector = self.hive_context.sql(command) + + # Validate the output. + self.validate_similarity_table(df, df_vector, False) + + def test_run3(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run3 ***') + + # Load the test configuration. + with open('application/pipeline/config_score_matrix_table.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + + # Get the names of the input and output tables. + vector_table = cfg['score_vector']['score_vector_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + + cfg['score_matrix_table']['did_bucket_size'] = 2 + cfg['score_matrix_table']['did_bucket_step'] = 2 + + # Create the input table. + self.create_vector_table(vector_table, False) + + # Run the function being tested. + score_matrix_table.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did_bucket""".format(matrix_table) + df = self.hive_context.sql(command) + df.show() + + command = """select * from {} order by did_bucket""".format(vector_table) + df_vector = self.hive_context.sql(command) + + # Validate the output. + self.validate_similarity_table(df, df_vector, False) + + def create_vector_table (self, table_name, same_bucket=False): + later_bucket = 1 + if same_bucket: + later_bucket = 0 + + data = [ + ('0000001', [0.1, 0.8, 0.9], 1.46, 0), + ('0000002', [0.1, 0.1, 0.1], 0.03, 0), + ('0000003', [0.1, 0.8, 0.9], 1.46, later_bucket), + ('0000004', [0.1, 0.2, 0.3], 0.14, later_bucket), + ] + + schema = StructType([ + StructField("did", StringType(), True), + StructField("score_vector", ArrayType(FloatType(), True), True), + StructField("c1", FloatType(), True), + StructField("did_bucket", IntegerType(), True) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + util.write_to_table(df, table_name) + + def create_matrix(self): + data = [ + (['0000001', '0000002', '0000003', '0000004'], + [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1], [0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], + [1.46, 0.03, 1.46, 0.14], + 0) + ] + + schema = StructType([ + StructField("did_list", ArrayType(StringType(), True)), + StructField("score_matrix", ArrayType(ArrayType(FloatType(), True)), True), + StructField("c1_list", ArrayType(FloatType(), True)), + StructField("did_bucket", IntegerType(), True) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + return df + + def create_matrix2(self): + data = [ + (['0000001', '0000002'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46, 0.03], 0), + (['0000003', '0000004'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1) + ] + + schema = StructType([ + StructField("did_list", ArrayType(StringType(), True)), + StructField("score_matrix", ArrayType(ArrayType(FloatType(), True)), True), + StructField("c1_list", ArrayType(FloatType(), True)), + StructField("did_bucket", IntegerType(), True) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + return df + + def validate_similarity_table (self, df, df_vector, same_bucket=False): + if same_bucket: + df_ref = self.create_matrix() + else: + df_ref = self.create_matrix2() + + # Verify the column names. + self.assertEqual(len(df.columns), len(df_ref.columns)) + for name in df.columns: + self.assertIn(name, df_ref.columns) + + # Verify the number of rows. + self.assertEqual(df.count(), df_ref.count()) + + # Index the score vector by did_bucket and did for quick reference. + vectors = {} + for row in df_vector.collect(): + did = row['did'] + score_vector = row['score_vector'] + c1 = row['c1'] + did_bucket = row['did_bucket'] + if did_bucket not in vectors: + vectors[did_bucket] = {did: (score_vector, c1)} + else: + vectors[did_bucket][did] = (score_vector, c1) + + # Check the row values. + for row in df.collect(): + did_bucket = row['did_bucket'] + self.assertIn(did_bucket, vectors) + did_map = vectors[did_bucket] + for did, vector, c1 in zip(row['did_list'], row['score_matrix'], row['c1_list']): + self.assertIn(did, did_map) + self.assertEqual(vector, did_map[did][0]) + self.assertEqual(c1, did_map[did][1]) + + +# Runs the tests. +if __name__ == '__main__': + # Run the unit tests. + unittest.main() \ No newline at end of file diff --git a/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py new file mode 100644 index 0000000..ddecac8 --- /dev/null +++ b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_1.py @@ -0,0 +1,183 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import yaml +from pyspark import SparkContext +from pyspark.sql import HiveContext, Row, SparkSession +from pyspark.sql.functions import col, udf, collect_set +from pyspark.sql.types import IntegerType, BooleanType, StructType, StructField, StringType, StructType, ArrayType, FloatType +from lookalike_model.pipeline import main_clean, util +from lookalike_model.pipeline.util import write_to_table +from lookalike_model.application.pipeline import top_n_similarity_table_generator +import random +import string + +''' +spark-submit --master yarn --num-executors 2 --executor-cores 5 --executor-memory 8G --driver-memory 8G --conf spark.driver.maxResultSize=5g --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict test_top_n_similarity_table_generator_1.py +''' + + +def random_string_generator(str_size): + PREFIX = 'lookalike_application_unittest_' + return PREFIX+''.join(random.choice(string.ascii_letters) for _ in range(str_size)) + + +class TestMainClean(unittest.TestCase): + + def setUp(self): + # Set the log level. + self.sc = SparkContext.getOrCreate() + self.sc.setLogLevel('ERROR') + + # Initialize the Spark session + self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate() + self.hive_context = HiveContext(self.sc) + + def drop_table(self, table_name): + self.hive_context.sql('DROP TABLE {}'.format(table_name)) + + def create_matrix_table(self, data, table_name): + schema = StructType([ + StructField("did_list", ArrayType(StringType(), False)), + StructField("score_matrix", ArrayType(ArrayType(FloatType(), False)), False), + StructField("c1_list", ArrayType(FloatType(), False)), + StructField("did_bucket", IntegerType(), False) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + util.write_to_table(df, table_name) + + def run_top_n_similarity_table_generator(self, cfg, _input): + self.create_matrix_table(_input, cfg['score_matrix_table']['score_matrix_table']) + top_n_similarity_table_generator.run(self.sc, self.hive_context, cfg) + result = self.hive_context.sql('SELECT did,top_n_similar_user,did_bucket FROM {}'.format(cfg['top_n_similarity']['similarity_table'])) + return result + + def compare_output_and_expected_and_cleanup(self, cfg, test_name, df_output, _expected_output): + elements_type = StructType([StructField('did', StringType(), False), StructField('score', FloatType(), False)]) + _schema = StructType([StructField('did', StringType(), False), StructField('top_n_similar_user', + ArrayType(elements_type), False), StructField('did_bucket', IntegerType(), False)]) + df_expected_output = self.hive_context.createDataFrame(_expected_output, _schema) + + df_output = df_output.sort('did') + df_expected_output = df_expected_output.sort('did') + + print('Test name : {}'.format(test_name)) + + print('Expected') + df_expected_output.show(10, False) + + print('Output') + df_output.show(10, False) + + diff = df_output.subtract(df_expected_output) + print('Difference') + diff.show(10, False) + diff_count = diff.count() + + # Clean up: Remove tmp tables + self.drop_table(cfg['score_matrix_table']['score_matrix_table']) + self.drop_table(cfg['top_n_similarity']['similarity_table']) + + return diff_count == 0 + + def test_run_1(self): + cfg = { + 'score_matrix_table': { + 'did_bucket_size': 2, + 'did_bucket_step': 2, + 'score_matrix_table': random_string_generator(10) + }, + + 'top_n_similarity': {'did_bucket_size': 2, + 'did_bucket_step': 2, + 'cross_bucket_size': 2, + 'top_n': 10, + 'similarity_table': random_string_generator(10)} + } + + _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.8, 0.9]], [1.46, 1.46], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.8, 0.9]], [1.46, 1.46], 1)] + _expected_output = [ + ('1', [Row(did='1', score=1.7316996), Row(did='2', score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', score=1.7316996)], 0), + ('2', [Row(did='1', score=1.7316996), Row(did='2', score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', score=1.7316996)], 0), + ('3', [Row(did='1', score=1.7316996), Row(did='2', score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', score=1.7316996)], 1), + ('4', [Row(did='1', score=1.7316996), Row(did='2', score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', score=1.7316996)], 1), + ] + + df_output = self.run_top_n_similarity_table_generator(cfg, _input) + are_equal = self.compare_output_and_expected_and_cleanup(cfg, 'test_run_1', df_output, _expected_output) + self.assertTrue(are_equal) + + def test_run_2(self): + cfg = { + 'score_matrix_table': { + 'did_bucket_size': 2, + 'did_bucket_step': 2, + 'score_matrix_table': random_string_generator(10) + }, + + 'top_n_similarity': {'did_bucket_size': 2, + 'did_bucket_step': 2, + 'cross_bucket_size': 2, + 'top_n': 10, + 'similarity_table': random_string_generator(10)} + } + + _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46, 0.03], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1)] + _expected_output = [ + ('1', [Row(did='1', score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', score=0.8835226), Row(did='2', score=0.6690362)], 0), + ('2', [Row(did='2', score=1.7320508), Row(did='4', score=1.5084441), Row(did='3', score=0.6690362), Row(did='1', score=0.6690362)], 0), + ('3', [Row(did='1', score=1.7316996), Row(did='3', score=1.7316996), Row(did='4', score=0.8835226), Row(did='2', score=0.6690362)], 1), + ('4', [Row(did='4', score=1.7320508), Row(did='2', score=1.5084441), Row(did='3', score=0.8835226), Row(did='1', score=0.8835226)], 1), + ] + + df_output = self.run_top_n_similarity_table_generator(cfg, _input) + are_equal = self.compare_output_and_expected_and_cleanup(cfg, 'test_run_2', df_output, _expected_output) + self.assertTrue(are_equal) + + def test_run_3(self): + cfg = { + 'score_matrix_table': { + 'did_bucket_size': 2, + 'did_bucket_step': 2, + 'score_matrix_table': random_string_generator(10) + }, + + 'top_n_similarity': {'did_bucket_size': 2, + 'did_bucket_step': 2, + 'cross_bucket_size': 2, + 'top_n': 2, + 'similarity_table': random_string_generator(10)} + } + + _input = [(['1', '2'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46, 0.03], 0), (['3', '4'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1)] + _expected_output = [ + ('1', [Row(did='1', score=1.7316996), Row(did='3', score=1.7316996)], 0), + ('2', [Row(did='2', score=1.7320508), Row(did='4', score=1.5084441)], 0), + ('3', [Row(did='1', score=1.7316996), Row(did='3', score=1.7316996)], 1), + ('4', [Row(did='4', score=1.7320508), Row(did='2', score=1.5084441)], 1), + ] + + df_output = self.run_top_n_similarity_table_generator(cfg, _input) + are_equal = self.compare_output_and_expected_and_cleanup(cfg, 'test_run_3', df_output, _expected_output) + self.assertTrue(are_equal) + + +# Runs the tests. +if __name__ == '__main__': + # Run the unit tests. + unittest.main() diff --git a/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py new file mode 100644 index 0000000..d7a56ae --- /dev/null +++ b/Model/lookalike-model/tests/application/pipeline/test_top_n_similarity_table_generator_2.py @@ -0,0 +1,282 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import yaml +from pyspark import SparkContext +from pyspark.sql import SparkSession, HiveContext +from pyspark.sql.types import DataType, StringType, StructField, StructType, FloatType, IntegerType, ArrayType, MapType +from lookalike_model.application.pipeline import util, top_n_similarity_table_generator + + +class TestTopNSimilarityTableGenerator(unittest.TestCase): + + def setUp (self): + # Set the log level. + self.sc = SparkContext.getOrCreate() + self.sc.setLogLevel('ERROR') + + # Initialize the Spark session + self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate() + self.hive_context = HiveContext(self.sc) + + def test_run(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run ***') + + # Load the test configuration. + with open('application/pipeline/config_top_n_similarity.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + + # Get the names of the input and output tables. + # alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + top_n_table = cfg['top_n_similarity']['similarity_table'] + + # Create the input table. + # self.create_alpha_table(alpha_table) + self.create_matrix_table(matrix_table) + + # Run the function being tested. + top_n_similarity_table_generator.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did""".format(top_n_table) + df = self.hive_context.sql(command) + df.show() + + # Validate the output. + self.validate_similarity_table(df, True) + + def test_run2(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run2 ***') + + # Load the test configuration. + with open('application/pipeline/config_top_n_similarity.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + cfg['top_n_similarity']['did_bucket_size'] = 2 + cfg['top_n_similarity']['cross_bucket_size'] = 2 + + # Get the names of the input and output tables. + # alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + top_n_table = cfg['top_n_similarity']['similarity_table'] + + # Create the input table. + # self.create_alpha_table(alpha_table) + self.create_matrix_table2(matrix_table) + + # Run the function being tested. + top_n_similarity_table_generator.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did""".format(top_n_table) + df = self.hive_context.sql(command) + df.show() + + # Validate the output. + self.validate_similarity_table(df) + + def test_run3(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run3 ***') + + # Load the test configuration. + with open('application/pipeline/config_top_n_similarity.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + cfg['top_n_similarity']['did_bucket_size'] = 2 + cfg['top_n_similarity']['did_bucket_step'] = 2 + cfg['top_n_similarity']['cross_bucket_size'] = 2 + + # Get the names of the input and output tables. + # alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + top_n_table = cfg['top_n_similarity']['similarity_table'] + + # Create the input table. + # self.create_alpha_table(alpha_table) + self.create_matrix_table2(matrix_table) + + # Run the function being tested. + top_n_similarity_table_generator.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did""".format(top_n_table) + df = self.hive_context.sql(command) + df.show() + + # Validate the output. + self.validate_similarity_table(df) + + def test_run4(self): + print('*** Running TestTopNSimilarityTableGenerator.test_run4 ***') + + # Load the test configuration. + with open('application/pipeline/config_top_n_similarity.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + util.resolve_placeholder(cfg) + cfg['top_n_similarity']['did_bucket_size'] = 2 + cfg['top_n_similarity']['did_bucket_step'] = 2 + cfg['top_n_similarity']['cross_bucket_size'] = 2 + cfg['top_n_similarity']['cross_bucket_step'] = 2 + + # Get the names of the input and output tables. + # alpha_table = cfg['score_vector_rebucketing']['score_vector_alpha_table'] + matrix_table = cfg['score_matrix_table']['score_matrix_table'] + top_n_table = cfg['top_n_similarity']['similarity_table'] + + # Create the input table. + # self.create_alpha_table(alpha_table) + self.create_matrix_table2(matrix_table) + + # Run the function being tested. + top_n_similarity_table_generator.run(self.spark, self.hive_context, cfg) + + # Load the output of the function. + command = """select * from {} order by did""".format(top_n_table) + df = self.hive_context.sql(command) + df.show() + + # Validate the output. + self.validate_similarity_table(df) + + def create_matrix_table(self, table_name): + data = [ + (['0000001', '0000002', '0000003', '0000004'], + [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1], [0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], + [1.46, 0.03, 1.46, 0.14], + 0) + ] + + schema = StructType([ + StructField("did_list", ArrayType(StringType(), True)), + StructField("score_matrix", ArrayType(ArrayType(FloatType(), True)), True), + StructField("c1_list", ArrayType(FloatType(), True)), + StructField("did_bucket", IntegerType(), True) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + util.write_to_table(df, table_name) + + def create_matrix_table2(self, table_name): + data = [ + (['0000001', '0000002'], [[0.1, 0.8, 0.9], [0.1, 0.1, 0.1]], [1.46, 0.03], 0), + (['0000003', '0000004'], [[0.1, 0.8, 0.9], [0.1, 0.2, 0.3]], [1.46, 0.14], 1) + ] + + schema = StructType([ + StructField("did_list", ArrayType(StringType(), True)), + StructField("score_matrix", ArrayType(ArrayType(FloatType(), True)), True), + StructField("c1_list", ArrayType(FloatType(), True)), + StructField("did_bucket", IntegerType(), True) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + util.write_to_table(df, table_name) + + def create_alpha_table (self, table_name): + data = [ + ('0000001', [0.1, 0.8, 0.9], 1.46, 0, 0), + ('0000002', [0.1, 0.1, 0.1], 0.03, 0, 1), + ('0000003', [0.1, 0.8, 0.9], 1.46, 1, 2), + ('0000004', [0.1, 0.2, 0.3], 0.14, 1, 3), + ] + + schema = StructType([ + StructField("did", StringType(), True), + StructField("score_vector", ArrayType(FloatType(), True), True), + StructField("c1", FloatType(), True), + StructField("did_bucket", IntegerType(), True), + StructField("alpha_did_bucket", IntegerType(), True) + ]) + + df = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + util.write_to_table(df, table_name) + + def validate_similarity_table (self, df, same_bucket=False): + later_bucket = 1 + if same_bucket: + later_bucket = 0 + data = [ + ('0000001', [{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 'score':0.66903623}], 0), + ('0000002', [{'did':'0000002', 'score':1.73205081}, {'did':'0000004', 'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, {'did':'0000003', 'score':0.66903623}], 0), + ('0000003', [{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 'score':0.66903623}], later_bucket), + ('0000004', [{'did':'0000004', 'score':1.73205081}, {'did':'0000002', 'score':1.50844401}, {'did':'0000001', 'score':0.88532267}, {'did':'0000003', 'score':0.88532267}], later_bucket) + ] + + # data = [ + # (['0000001', '0000002', '0000003', '0000004'], + # [[{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 'score':0.66903623}], + # [{'did':'0000002', 'score':1.73205081}, {'did':'0000004', 'score':1.50844401}, {'did':'0000001', 'score':0.66903623}, {'did':'0000003', 'score':0.66903623}], + # [{'did':'0000001', 'score':1.73205081}, {'did':'0000003', 'score':1.73205081}, {'did':'0000004', 'score':0.88532267}, {'did':'0000002', 'score':0.66903623}], + # [{'did':'0000004', 'score':1.73205081}, {'did':'0000002', 'score':1.50844401}, {'did':'0000001', 'score':0.88532267}, {'did':'0000003', 'score':0.88532267}]], + # 0) + # ] + + schema = StructType([ + StructField("did", StringType(), True), + StructField("top_n_similar_user", ArrayType(MapType(StringType(), StringType(), True), True)), + StructField("did_bucket", IntegerType(), True) + ]) + + df_ref = self.spark.createDataFrame(self.spark.sparkContext.parallelize(data), schema) + # util.write_to_table(df, table_name) + + # Verify the column names. + self.assertEqual(len(df.columns), len(df_ref.columns)) + for name in df.columns: + self.assertIn(name, df_ref.columns) + + # Verify the number of rows. + self.assertEqual(df.count(), df_ref.count()) + + # Check the row values. + for row, row_ref in zip(df.collect(), df_ref.collect()): + self.assertEqual(row['did'], row_ref['did']) + self.assertEqual(row['did_bucket'], row_ref['did_bucket']) + top_n = row['top_n_similar_user'] + top_n_ref = row_ref['top_n_similar_user'] + top_n_ordered = [] + + # Convert the reference list into a list of score/[did] tuples. + for ref in top_n_ref: + if len(top_n_ordered) == 0 or float(ref['score']) != top_n_ordered[-1][0]: + top_n_ordered.append((float(ref['score']), [ ref['did'] ])) + else: + top_n_ordered[-1][1].append(ref['did']) + print(top_n_ordered) + + # Verify the similarity order and values. + index = 0 + index_count = 0 + for item in top_n: + self.assertAlmostEqual(item['score'], top_n_ordered[index][0], 2) + self.assertIn(item['did'], top_n_ordered[index][1]) + index_count += 1 + if (index_count == len(top_n_ordered[index][1])): + index += 1 + index_count = 0 + + + + + + +# Runs the tests. +if __name__ == '__main__': + # Run the unit tests. + unittest.main() \ No newline at end of file diff --git a/Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md b/Model/lookalike-model/tests/pipeline/README.md similarity index 100% rename from Model/lookalike-model/lookalike_model/application/pipeline/dags/README.md rename to Model/lookalike-model/tests/pipeline/README.md diff --git a/Model/lookalike-model/tests/pipeline/config_clean.yml b/Model/lookalike-model/tests/pipeline/config_clean.yml index 8ae2c63..546a0b5 100644 --- a/Model/lookalike-model/tests/pipeline/config_clean.yml +++ b/Model/lookalike-model/tests/pipeline/config_clean.yml @@ -7,6 +7,8 @@ keywords_table: 'lookalike_unittest_clean_input_keywords' log: level: 'ERROR' # log level for spark and app pipeline: + main_keywords: + keyword_output_table: 'lookalike_unittest_clean_input_effective_keywords' main_clean: did_bucket_num: 2 # Number of partitions for did load_logs_in_minutes: 1440 #1440/day, original=14400 diff --git a/Model/lookalike-model/tests/pipeline/config_keywords.yml b/Model/lookalike-model/tests/pipeline/config_keywords.yml new file mode 100644 index 0000000..4f95b65 --- /dev/null +++ b/Model/lookalike-model/tests/pipeline/config_keywords.yml @@ -0,0 +1,22 @@ +product_tag: 'lookalike' +pipeline_tag: 'unittest' +persona_table_name: 'lookalike_unittest_keywords_input_persona' +showlog_table_name: 'lookalike_unittest_keywords_input_showlog' +clicklog_table_name: 'lookalike_unittest_keywords_input_clicklog' +keywords_table: 'lookalike_unittest_keywords_input_keywords' +log: + level: 'ERROR' # log level for spark and app +pipeline: + main_keywords: + keyword_output_table: 'lookalike_unittest_keywords_output_keywords' + keyword_threshold: 0.1 # Portion of showlog traffic that a keyword must reach to be included. + main_clean: + did_bucket_num: 2 # Number of partitions for did + load_logs_in_minutes: 1440 #1440/day, original=14400 + create_keywords: False # set True for first run, then keep False to use the created table. + conditions: { + 'starting_date': '2020-01-01', + 'ending_date': '2020-01-11' + } + cutting_date: 1584748800 + length: 10 diff --git a/Model/lookalike-model/tests/pipeline/data_generator.py b/Model/lookalike-model/tests/pipeline/data_generator.py index 45711ce..59fc0ce 100644 --- a/Model/lookalike-model/tests/pipeline/data_generator.py +++ b/Model/lookalike-model/tests/pipeline/data_generator.py @@ -68,6 +68,20 @@ def create_unified_log_table (spark, table_name): df = create_unified_log(spark) write_to_table(df, table_name) +# Creates raw clicklog data and writes it to Hive. +def create_keywords_showlog_table (spark, table_name): + df = create_keywords_raw_log(spark) + df = df.withColumnRenamed('media', 'adv_type') + df = df.withColumnRenamed('price_model', 'adv_bill_mode_cd') + df = df.withColumnRenamed('action_time', 'show_time') + df.printSchema() + write_to_table(df, table_name) + +# Creates the effective keywords data and writes is to Hive. +def create_effective_keywords_table(spark, table_name): + df = create_effective_keywords(spark) + write_to_table(df, table_name) + #========================================== # Create dataframes for the unit tests #========================================== @@ -164,20 +178,6 @@ def create_raw_log (spark): def create_cleaned_log (spark): data = [ ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 13:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 14:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 15:59:59.99', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:00:00.00', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 16:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 17:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 18:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 19:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 20:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 21:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 22:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), - # ('C000', '0000001', '1000', 'splash', 'abcdef0', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 23:34:56.78', 'Huawei Magazine', 0, 0, 'travel', '1', '2020-01-01', '1', ), ('C001', '0000002', '1000', 'splash', 'abcdef1', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78', 'Huawei Browser', 1, 0, 'travel', '1', '2020-01-02', '1', ), ('C002', '0000003', '1001', 'native', 'abcdef2', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78', 'Huawei Video', 0, 1, 'travel', '1', '2020-01-03', '1', ), ('C010', '0000004', '1001', 'native', 'abcdef3', 'ABC-AL00', '4G', 'CPD', '2020-01-04 12:34:56.78', 'Huawei Music', 1, 1, 'game-avg', '2', '2020-01-04', '1', ), @@ -227,7 +227,17 @@ def create_keywords(spark): ('reading', 'C021', 3), ('reading', 'C022', 3), ('reading', 'C023', 3), - ('reading', 'C024', 3) + ('reading', 'C024', 3), + ('shopping', 'C030', 4), + ('shopping', 'C031', 4), + ('shopping', 'C032', 4), + ('shopping', 'C033', 4), + ('shopping', 'C034', 4), + ('education', 'C040', 5), + ('education', 'C041', 5), + ('education', 'C042', 5), + ('education', 'C043', 5), + ('education', 'C044', 5), ] schema = StructType([ @@ -360,6 +370,53 @@ def create_trainready_filter_user_data (spark): return spark.createDataFrame(spark.sparkContext.parallelize(data), schema) +# Returns a dataframe with unclean log data. +def create_keywords_raw_log (spark): + data = [ + ('0000001', '1000', 'splash', 'abcdef0', 'C000', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-01 12:34:56.78'), # travel + ('0000002', '1000', 'splash', 'abcdef1', 'C001', 'DUB-AL00', 'WIFI', 'CPC', '2020-01-02 12:34:56.78'), # travel + ('0000003', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000005', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000006', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000007', '1001', 'native', 'abcdef2', 'C002', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000008', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000009', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000010', '1001', 'native', 'abcdef2', 'C003', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000011', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000012', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000013', '1001', 'native', 'abcdef2', 'C004', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # travel + ('0000014', '1001', 'native', 'abcdef2', 'C010', 'ABC-AL00', '4G', 'CPD', '2020-01-03 12:34:56.78'), # game-avg + ('0000004', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00', '4G', 'CPD', '2020-01-04 12:34:56.78'), # game-avg + ('0000005', '1001', 'native', 'abcdef3', 'C010', 'ABC-AL00', '4G', 'CPD', '2020-01-05 12:34:56.78'), # game-avg + ('0000007', '1003', 'splash', 'abcdef6', 'C020', 'XYZ-AL00', '4G', 'CPT', '2020-01-07 12:34:56.78'), # reading; only one entry for this keyword so will be excluded. + ('0000008', '1003', 'splash', 'abcdef6', 'C030', 'XYZ-AL00', '4G', 'CPT', '2020-01-08 12:34:56.78'), # shopping; only one entry for this keyword so will be excluded. + ('0000009', '1003', 'splash', 'abcdef6', 'C040', 'XYZ-AL00', '4G', 'CPT', '2020-01-09 12:34:56.78'), # education; just enough entries to be included. + ('0000009', '1003', 'splash', 'abcdef6', 'C040', 'XYZ-AL00', '4G', 'CPT', '2020-01-09 12:34:56.78'), # education; just enough entries to be included. + ('0000010', '1003', 'splash', 'abcdef6', 'C050', 'XYZ-AL00', '4G', 'CPT', '2020-01-10 12:34:56.78'), # no mapping; only one entry for this keyword so will be excluded. + ('0000001', '1000', 'native', 'abcde10', 'C020', 'JKL-AL00', '4G', 'CPD', '2020-01-11 12:34:56.78'), # reading; outside the date range. + ] + + schema = StructType([ + StructField("did", StringType(), True), + StructField("adv_id", StringType(), True), + StructField("media", StringType(), True), + StructField("slot_id", StringType(), True), + StructField("spread_app_id", StringType(), True), + StructField("device_name", StringType(), True), + StructField("net_type", StringType(), True), + StructField("price_model", StringType(), True), + StructField("action_time", StringType(), True) + ]) + + return spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + +# Returns a dataframe with the effective keyword data. +def create_effective_keywords(spark): + data = [('travel',), ('game-avg',), ('education',)] + schema = StructType([ + StructField("keyword", StringType(), True) + ]) + return spark.createDataFrame(spark.sparkContext.parallelize(data), schema) # Prints to screen the code to generate the given data frame. diff --git a/Model/lookalike-model/tests/pipeline/test_main_clean.py b/Model/lookalike-model/tests/pipeline/test_main_clean.py index 3102fa3..c302479 100644 --- a/Model/lookalike-model/tests/pipeline/test_main_clean.py +++ b/Model/lookalike-model/tests/pipeline/test_main_clean.py @@ -18,8 +18,8 @@ import unittest import yaml from pyspark import SparkContext from pyspark.sql import SparkSession, HiveContext -from pyspark.sql.functions import col -from pyspark.sql.types import IntegerType +from pyspark.sql.functions import col, udf, collect_set +from pyspark.sql.types import IntegerType, BooleanType from lookalike_model.pipeline import main_clean, util from data_generator import * @@ -163,10 +163,12 @@ class TestMainClean(unittest.TestCase): keywords_table = cfg['keywords_table'] showlog_table = cfg['showlog_table_name'] clicklog_table = cfg['clicklog_table_name'] + effective_keywords_table = cfg['pipeline']['main_keywords']['keyword_output_table'] create_persona_table(self.spark, persona_table) create_keywords_table(self.spark, keywords_table) create_clicklog_table(self.spark, clicklog_table) create_showlog_table(self.spark, showlog_table) + create_effective_keywords_table(self.spark, effective_keywords_table) # Drop the output tables showlog_output_table = cfg['pipeline']['main_clean']['showlog_output_table'] @@ -184,22 +186,32 @@ class TestMainClean(unittest.TestCase): bucket_num = cfg['pipeline']['main_clean']['did_bucket_num'] df_keywords = util.load_df(self.hive_context, keywords_table) + # run() does filtering on the effective keywords so we need to filter + # the raw logs with the spread app ids when validating the output. + effective_spread_app_ids = ['C000', 'C001', 'C002', 'C003', 'C004', 'C010', 'C011', 'C012', 'C013', 'C014', ] + df_log = create_raw_log(self.spark) + df_log = self.filter_spread_app_ids(df_log, effective_spread_app_ids) + # Validate the cleaned persona table. df_persona = util.load_df(self.hive_context, persona_output_table) self.validate_clean_persona(df_persona, bucket_num) # Validate the cleaned clicklog table. df_clicklog = util.load_df(self.hive_context, clicklog_output_table) - print_df_generator_code(df_clicklog.sort('did')) - df_log = create_raw_log(self.spark) self.validate_cleaned_log(df_clicklog, conditions, df_persona, df_keywords, df_log, bucket_num) + print_df_generator_code(df_clicklog.sort('did')) # Validate the cleaned showlog table. df_showlog = util.load_df(self.hive_context, clicklog_output_table) - print_df_generator_code(df_showlog.sort('did')) - df_log = create_raw_log(self.spark) self.validate_cleaned_log(df_showlog, conditions, df_persona, df_keywords, df_log, bucket_num) + print_df_generator_code(df_showlog.sort('did')) + + def filter_spread_app_ids(self, df, spread_app_ids): + # User defined function to return if the keyword is in the inclusion set. + _udf = udf(lambda x: x in spread_app_ids, BooleanType()) + # Return the filtered dataframe. + return df.filter(_udf(col('spread_app_id'))) #======================================== # Helper methods diff --git a/Model/lookalike-model/tests/pipeline/test_main_keywords.py b/Model/lookalike-model/tests/pipeline/test_main_keywords.py new file mode 100644 index 0000000..6a50389 --- /dev/null +++ b/Model/lookalike-model/tests/pipeline/test_main_keywords.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0.html + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import yaml +from pyspark import SparkContext +from pyspark.sql import SparkSession, HiveContext +from lookalike_model.pipeline import main_keywords, util +from data_generator import * + +class TestMainKeywords(unittest.TestCase): + + def setUp (self): + # Set the log level. + sc = SparkContext.getOrCreate() + sc.setLogLevel('ERROR') + + # Initialize the Spark session + self.spark = SparkSession.builder.appName('unit test').enableHiveSupport().getOrCreate() + self.hive_context = HiveContext(sc) + + def test_run(self): + print('*** Running test_run ***') + with open('pipeline/config_keywords.yml', 'r') as ymlfile: + cfg = yaml.safe_load(ymlfile) + + util.resolve_placeholder(cfg) + + # Create the input tables. + cfg_clean = cfg['pipeline']['main_clean'] + showlog_table = cfg['showlog_table_name'] + keywords_mapping_table = cfg['keywords_table'] + create_keywords_mapping = cfg_clean['create_keywords'] + create_keywords_table(self.spark, keywords_mapping_table) + create_keywords_showlog_table(self.spark, showlog_table) + + # Drop the output tables. + cfg_keywords = cfg['pipeline']['main_keywords'] + keyword_threshold = cfg_keywords['keyword_threshold'] + effective_keywords_table = cfg_keywords['keyword_output_table'] + util.drop_table(self.hive_context, effective_keywords_table) + + start_date, end_date, load_minutes = util.load_batch_config(cfg) + + main_keywords.run(self.hive_context, showlog_table, keywords_mapping_table, create_keywords_mapping, + start_date, end_date, load_minutes, keyword_threshold, effective_keywords_table) + + df_keywords = util.load_df(self.hive_context, effective_keywords_table) + + self.validate_effective_keywords(df_keywords) + + + def validate_effective_keywords(self, df): + # Check the column names. + self.assertTrue('keyword' in df.columns) + + # Expected keywords in the dataframe. + keywords_match = ['education', 'travel', 'game-avg'] + + # Check number of rows. + self.assertEqual(df.count(), len(keywords_match)) + + df.show() + + # Check the values of the rows. + # keywords = df.agg(collect_list('keyword')).collect() + # for value in keywords_match: + # self.assertTrue(value in keywords) + for row in df.collect(): + self.assertIn(row['keyword'], keywords_match) + +# Runs the tests. +if __name__ == '__main__': + # Run the unit tests. + unittest.main() diff --git a/Model/lookalike-model/tests/run_test.sh b/Model/lookalike-model/tests/run_test.sh index e4fdff9..e360cdf 100644 --- a/Model/lookalike-model/tests/run_test.sh +++ b/Model/lookalike-model/tests/run_test.sh @@ -3,6 +3,12 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" cd $DIR +# test_main_logs: identifies the keywords that have traffic greater than a set percentage. +if false +then + spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_keywords.py +fi + # test_main_clean: preparing cleaned persona, click and show logs data. if false then @@ -15,8 +21,21 @@ then spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_logs.py fi -# test_main_logs: merges click and show log data. -if true +# test_main_trainready: aggregates the click and show log data by user. +if false then spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict pipeline/test_main_trainready.py fi + +# test_score_matrix_table: Converts the score vector table into matrices. +if true +then + spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict application/pipeline/test_score_matrix_table.py +fi + +# test_top_n_similarity_table_generator: Finds the top n users similar to the given user. +if false +then + spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict application/pipeline/test_top_n_similarity_table_generator_1.py + spark-submit --master yarn --num-executors 5 --executor-cores 2 --conf spark.hadoop.hive.exec.dynamic.partition=true --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict application/pipeline/test_top_n_similarity_table_generator_2.py +fi